diff --git a/indexer/src/kadena-server/config/graphql-types.ts b/indexer/src/kadena-server/config/graphql-types.ts index 8f576c04..c2d78546 100644 --- a/indexer/src/kadena-server/config/graphql-types.ts +++ b/indexer/src/kadena-server/config/graphql-types.ts @@ -1103,15 +1103,18 @@ export type SubscriptionEventsArgs = { minimumDepth?: InputMaybe; parametersFilter?: InputMaybe; qualifiedEventName: Scalars['String']['input']; + quantity?: InputMaybe; }; export type SubscriptionNewBlocksArgs = { chainIds?: InputMaybe>; + quantity?: InputMaybe; }; export type SubscriptionNewBlocksFromDepthArgs = { chainIds?: InputMaybe>; minimumDepth: Scalars['Int']['input']; + quantity?: InputMaybe; }; export type SubscriptionTransactionArgs = { @@ -3101,21 +3104,21 @@ export type SubscriptionResolvers< 'events', ParentType, ContextType, - RequireFields + RequireFields >; newBlocks?: SubscriptionResolver< Maybe>, 'newBlocks', ParentType, ContextType, - Partial + RequireFields >; newBlocksFromDepth?: SubscriptionResolver< Maybe>, 'newBlocksFromDepth', ParentType, ContextType, - RequireFields + RequireFields >; transaction?: SubscriptionResolver< Maybe, diff --git a/indexer/src/kadena-server/config/schema.graphql b/indexer/src/kadena-server/config/schema.graphql index 626f7008..a921f151 100644 --- a/indexer/src/kadena-server/config/schema.graphql +++ b/indexer/src/kadena-server/config/schema.graphql @@ -71,31 +71,32 @@ type PoolCharts { """ Volume data points """ - volume: [ChartDataPoint!]! + volume: [ChartDataPoint!]! @complexity(value: 1) """ TVL data points """ - tvl: [ChartDataPoint!]! + tvl: [ChartDataPoint!]! @complexity(value: 1) """ Fees data points """ - fees: [ChartDataPoint!]! + fees: [ChartDataPoint!]! @complexity(value: 1) } type Subscription { """ Subscribe to new blocks. """ - newBlocks(chainIds: [String!]): [Block!] + newBlocks(chainIds: [String!], quantity: Int = 20): [Block!] + @complexity(value: 1, multipliers: ["quantity"]) """ Listen for a transaction by request key. """ - transaction(chainId: String, requestKey: String!): Transaction + transaction(chainId: String, requestKey: String!): Transaction @complexity(value: 1) """ Listen for transactions by chain ID and minimum confirmation depth. """ - transactions(quantity: Int = 20): [Transaction!] + transactions(quantity: Int = 20): [Transaction!] @complexity(value: 1, multipliers: ["quantity"]) """ Listen for events by qualifiedName (e.g. `coin.TRANSFER`). @@ -109,18 +110,20 @@ type Subscription { minimumDepth: Int parametersFilter: String qualifiedEventName: String! - ): [Event!] + quantity: Int = 20 + ): [Event!] @complexity(value: 1, multipliers: ["quantity"]) """ Subscribe to new blocks from a specific depth. """ - newBlocksFromDepth(chainIds: [String!], minimumDepth: Int!): [Block!] + newBlocksFromDepth(chainIds: [String!], minimumDepth: Int!, quantity: Int = 20): [Block!] + @complexity(value: 1, multipliers: ["quantity"]) } type Query { """ Retrieve a block by hash. """ - block(hash: String!): Block @complexity(value: 5) + block(hash: String!): Block @complexity(value: 1) """ Retrieve blocks by chain and minimal depth. Default page size is 20. @@ -132,7 +135,7 @@ type Query { first: Int last: Int minimumDepth: Int! - ): QueryBlocksFromDepthConnection @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryBlocksFromDepthConnection @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve blocks by chain and minimal height. Default page size is 20. @@ -148,7 +151,7 @@ type Query { first: Int last: Int startHeight: Int! - ): QueryBlocksFromHeightConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryBlocksFromHeightConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve all completed blocks from a given height. Default page size is 20. @@ -167,7 +170,7 @@ type Query { first: Int heightCount: Int = 3 last: Int - ): QueryCompletedBlockHeightsConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryCompletedBlockHeightsConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve events by qualifiedName (e.g. `coin.TRANSFER`). Default page size is 20. @@ -190,7 +193,7 @@ type Query { parametersFilter: String qualifiedEventName: String! requestKey: String - ): QueryEventsConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryEventsConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve an fungible specific account by its name and fungible, such as coin. @@ -204,7 +207,7 @@ type Query { fungibleAccountsByPublicKey( fungibleName: String = "coin" publicKey: String! - ): [FungibleAccount!]! @complexity(value: 5, multipliers: ["length"]) + ): [FungibleAccount!]! @complexity(value: 1) """ Retrieve an account by its name and fungible, such as coin, on a specific chain. @@ -222,7 +225,7 @@ type Query { accountName: String! chainIds: [String!] fungibleName: String = "coin" - ): [FungibleChainAccount!] @complexity(value: 5, multipliers: ["length"]) + ): [FungibleChainAccount!] @complexity(value: 1) """ Retrieve a chain account by public key. @@ -231,7 +234,7 @@ type Query { chainId: String! fungibleName: String = "coin" publicKey: String! - ): [FungibleChainAccount!]! @complexity(value: 5, multipliers: ["length"]) + ): [FungibleChainAccount!]! @complexity(value: 1) """ Estimate the gas limit for one or more transactions. Throws an error when the transaction fails or is invalid. The input accepts a JSON object and based on the parameters passed it will determine what type of format it is and return the gas limit estimation. The following types are supported: @@ -247,8 +250,7 @@ type Query {   Example of the input needed for a type `code` query: `gasLimitEstimate(input: "{\"code\":\"(coin.details \\\"k:1234\\\")\",\"chainId\":\"3\"}")` """ - gasLimitEstimate(input: [String!]!): [GasLimitEstimation!]! - @complexity(value: 5, multipliers: ["length"]) + gasLimitEstimate(input: [String!]!): [GasLimitEstimation!]! @complexity(value: 1) """ Get the configuration of the graph. @@ -267,7 +269,7 @@ type Query { node(id: ID!): Node @complexity(value: 1) - nodes(ids: [ID!]!): [Node]! @complexity(value: 5, multipliers: ["length"]) + nodes(ids: [ID!]!): [Node]! @complexity(value: 1) """ Retrieve a non-fungible specific account by its name. @@ -283,8 +285,7 @@ type Query { """ Execute arbitrary Pact code via a local call without gas-estimation or signature-verification (e.g. (+ 1 2) or (coin.get-details )). """ - pactQuery(pactQuery: [PactQuery!]!): [PactQueryResponse!]! - @complexity(value: 5, multipliers: ["length"]) + pactQuery(pactQuery: [PactQuery!]!): [PactQueryResponse!]! @complexity(value: 1) """ Retrieve one transaction by its unique key. Throws an error if multiple transactions are found. @@ -310,7 +311,7 @@ type Query { minimumDepth: Int requestKey: String isCoinbase: Boolean - ): QueryTransactionsConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryTransactionsConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve all transactions by a given public key. @@ -321,7 +322,7 @@ type Query { first: Int last: Int publicKey: String! - ): QueryTransactionsByPublicKeyConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryTransactionsByPublicKeyConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve transfers. Default page size is 20. @@ -336,10 +337,10 @@ type Query { fungibleName: String last: Int requestKey: String - ): QueryTransfersConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryTransfersConnection! @complexity(value: 1, multipliers: ["first", "last"]) tokens(after: String, before: String, first: Int, last: Int): QueryTokensConnection! - @complexity(value: 10, multipliers: ["first", "last"]) + @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve liquidity pools. Default page size is 20. @@ -351,7 +352,7 @@ type Query { last: Int orderBy: PoolOrderBy = TVL_USD_DESC protocolAddress: String - ): QueryPoolsConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): QueryPoolsConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Retrieve a specific pool by its ID. @@ -373,7 +374,7 @@ type Query { after: String last: Int before: String - ): PoolTransactionsConnection + ): PoolTransactionsConnection @complexity(value: 1, multipliers: ["first", "last"]) """ Get user's liquidity positions @@ -385,7 +386,7 @@ type Query { last: Int before: String orderBy: LiquidityPositionOrderBy = VALUE_USD_DESC - ): LiquidityPositionsConnection! + ): LiquidityPositionsConnection! @complexity(value: 1, multipliers: ["first", "last"]) """ Get DEX metrics including TVL, volume, and pool count @@ -403,17 +404,17 @@ type Query { Filter metrics by protocol address """ protocolAddress: String - ): DexMetrics! + ): DexMetrics! @complexity(value: 1) """ Get price for a specific token """ - tokenPrice(tokenAddress: String!, protocolAddress: String): TokenPrice + tokenPrice(tokenAddress: String!, protocolAddress: String): TokenPrice @complexity(value: 1) """ Get prices for all tokens in a protocol """ - tokenPrices(protocolAddress: String): [TokenPrice!]! + tokenPrices(protocolAddress: String): [TokenPrice!]! @complexity(value: 1) } """ @@ -428,7 +429,7 @@ type FungibleAccount implements Node { Example: 3 chain accounts = 5 × 3 = 15 complexity points. This pattern is ideal for array fields where complexity scales linearly with result size. """ - chainAccounts: [FungibleChainAccount!]! @complexity(value: 5, multipliers: ["length"]) + chainAccounts: [FungibleChainAccount!]! @complexity(value: 1) fungibleName: String! totalBalance: Decimal! """ @@ -442,14 +443,14 @@ type FungibleAccount implements Node { before: String first: Int last: Int - ): FungibleAccountTransactionsConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): FungibleAccountTransactionsConnection! @complexity(value: 1, multipliers: ["first", "last"]) transfers( after: String before: String first: Int last: Int - ): FungibleAccountTransfersConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): FungibleAccountTransfersConnection! @complexity(value: 1, multipliers: ["first", "last"]) } type FungibleAccountTransactionsConnection { @@ -481,8 +482,7 @@ type NonFungibleChainAccount implements Node { id: ID! accountName: String! chainId: String! - nonFungibleTokenBalances: [NonFungibleTokenBalance!]! - @complexity(value: 5, multipliers: ["length"]) + nonFungibleTokenBalances: [NonFungibleTokenBalance!]! @complexity(value: 1) """ Default page size is 20. Note that custom token related transactions are not included. @@ -493,6 +493,7 @@ type NonFungibleChainAccount implements Node { first: Int last: Int ): NonFungibleChainAccountTransactionsConnection! + @complexity(value: 1, multipliers: ["first", "last"]) } type QueryCompletedBlockHeightsConnection { @@ -615,7 +616,7 @@ type Block implements Node { weight: String! target: String! coinbase: String! - neighbors: [BlockNeighbor!]! @complexity(value: 5, multipliers: ["length"]) + neighbors: [BlockNeighbor!]! @complexity(value: 1) """ The proof of work hash. """ @@ -628,12 +629,13 @@ type Block implements Node { Default page size is 20. """ events(after: String, before: String, first: Int, last: Int): BlockEventsConnection! - @complexity(value: 10, multipliers: ["first", "last"]) + @complexity(value: 1, multipliers: ["first", "last"]) minerAccount: FungibleChainAccount! @complexity(value: 1) """ Default page size is 20. """ transactions(after: String, before: String, first: Int, last: Int): BlockTransactionsConnection! + @complexity(value: 1, multipliers: ["first", "last"]) } interface Node { @@ -715,7 +717,7 @@ type FungibleChainAccount implements Node { first: Int last: Int ): FungibleChainAccountTransactionsConnection! - @complexity(value: 10, multipliers: ["first", "last"]) + @complexity(value: 1, multipliers: ["first", "last"]) """ Default page size is 20. @@ -725,7 +727,7 @@ type FungibleChainAccount implements Node { before: String first: Int last: Int - ): FungibleChainAccountTransfersConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): FungibleChainAccountTransfersConnection! @complexity(value: 1, multipliers: ["first", "last"]) } type FungibleChainAccountTransactionsConnection { @@ -758,8 +760,8 @@ type Transaction { cmd: TransactionCommand! @complexity(value: 1) hash: String! result: TransactionInfo! @complexity(value: 1) - sigs: [TransactionSignature!]! @complexity(value: 5, multipliers: ["length"]) - orphanedTransactions: [Transaction] @complexity(value: 5, multipliers: ["length"]) + sigs: [TransactionSignature!]! @complexity(value: 1) + orphanedTransactions: [Transaction] @complexity(value: 1) } type Transaction implements Node { @@ -767,7 +769,7 @@ type Transaction implements Node { cmd: TransactionCommand! @complexity(value: 1) hash: String! result: TransactionInfo! @complexity(value: 1) - sigs: [TransactionSignature!]! @complexity(value: 5, multipliers: ["length"]) + sigs: [TransactionSignature!]! @complexity(value: 1) } """ @@ -781,7 +783,7 @@ type TransactionCommand { networkId: String! nonce: String! payload: TransactionPayload! @complexity(value: 1) - signers: [Signer!]! @complexity(value: 5, multipliers: ["length"]) + signers: [Signer!]! @complexity(value: 1) } """ @@ -897,9 +899,9 @@ type TransactionResult { before: String first: Int last: Int - ): TransactionResultTransfersConnection! @complexity(value: 10, multipliers: ["first", "last"]) + ): TransactionResultTransfersConnection! @complexity(value: 1, multipliers: ["first", "last"]) events(after: String, before: String, first: Int, last: Int): TransactionResultEventsConnection! - @complexity(value: 10, multipliers: ["first", "last"]) + @complexity(value: 1, multipliers: ["first", "last"]) } type TransactionResultEventsConnection { @@ -1085,10 +1087,9 @@ A non-fungible-specific account. """ type NonFungibleAccount implements Node { accountName: String! - chainAccounts: [NonFungibleChainAccount!]! @complexity(value: 5, multipliers: ["length"]) + chainAccounts: [NonFungibleChainAccount!]! @complexity(value: 1) id: ID! - nonFungibleTokenBalances: [NonFungibleTokenBalance!]! - @complexity(value: 5, multipliers: ["length"]) + nonFungibleTokenBalances: [NonFungibleTokenBalance!]! @complexity(value: 1) """ Default page size is 20. Note that custom token related transactions are not included. """ @@ -1097,8 +1098,7 @@ type NonFungibleAccount implements Node { before: String first: Int last: Int - ): NonFungibleAccountTransactionsConnection! - @complexity(value: 10, multipliers: ["first", "last"]) + ): NonFungibleAccountTransactionsConnection! @complexity(value: 1, multipliers: ["first", "last"]) } type NonFungibleAccountTransactionsConnection { @@ -1202,8 +1202,8 @@ A liquidity pool for a token pair. type Pool implements Node { id: ID! address: String! - token0: Token! - token1: Token! + token0: Token! @complexity(value: 1) + token1: Token! @complexity(value: 1) reserve0: String! reserve1: String! totalSupply: String! @@ -1223,7 +1223,7 @@ type Pool implements Node { """ Get chart data for this pool """ - charts(timeFrame: TimeFrame!): PoolCharts! + charts(timeFrame: TimeFrame!): PoolCharts! @complexity(value: 1) """ Get transactions for this pool """ @@ -1248,7 +1248,7 @@ type Pool implements Node { Cursor for pagination """ before: String - ): PoolTransactionsConnection @complexity(value: 10, multipliers: ["first"]) + ): PoolTransactionsConnection @complexity(value: 1, multipliers: ["first", "last"]) } type QueryPoolsConnection { @@ -1411,7 +1411,7 @@ type LiquidityPosition { walletAddress: String! valueUsd: Decimal! apr24h: Decimal! - pair: Pool! + pair: Pool! @complexity(value: 1) createdAt: DateTime! updatedAt: DateTime! } @@ -1442,11 +1442,11 @@ type DexMetrics { """ Historical TVL data points """ - tvlHistory: [ChartDataPoint!]! + tvlHistory: [ChartDataPoint!]! @complexity(value: 1) """ Historical volume data points """ - volumeHistory: [ChartDataPoint!]! + volumeHistory: [ChartDataPoint!]! @complexity(value: 1) """ Total volume in USD for the specified period """ @@ -1455,7 +1455,7 @@ type DexMetrics { type TokenPrice { id: ID! - token: Token! + token: Token! @complexity(value: 1) priceInKda: Decimal! priceInUsd: Decimal! protocolAddress: String! diff --git a/indexer/src/kadena-server/repository/application/block-repository.ts b/indexer/src/kadena-server/repository/application/block-repository.ts index 52968445..a89a83b1 100644 --- a/indexer/src/kadena-server/repository/application/block-repository.ts +++ b/indexer/src/kadena-server/repository/application/block-repository.ts @@ -24,6 +24,7 @@ export interface GetLatestBlocksParams { creationTime: number; lastBlockId?: number; chainIds?: string[]; + quantity: number; } export interface UpdateCanonicalStatusParams { @@ -77,6 +78,7 @@ export default interface BlockRepository { chainIds: string[], minimumDepth: number, startingTimestamp: number, + quantity: number, id?: string, ): Promise; diff --git a/indexer/src/kadena-server/repository/application/event-repository.ts b/indexer/src/kadena-server/repository/application/event-repository.ts index b2e13f28..557eae54 100644 --- a/indexer/src/kadena-server/repository/application/event-repository.ts +++ b/indexer/src/kadena-server/repository/application/event-repository.ts @@ -36,6 +36,7 @@ export interface GetEventParams { export interface GetLastEventsParams { qualifiedEventName: string; + quantity: number; lastEventId?: number; chainId?: string | null; minimumDepth?: number | null; diff --git a/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts b/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts index 4f493a5e..116c3a3a 100644 --- a/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts +++ b/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts @@ -715,14 +715,14 @@ export default class BlockDbRepository implements BlockRepository { * @returns Promise resolving to an array of recent blocks */ async getLatestBlocks(params: GetLatestBlocksParams): Promise { - const { creationTime, lastBlockId, chainIds = [] } = params; + const { creationTime, lastBlockId, chainIds = [], quantity } = params; const blocks = await BlockModel.findAll({ where: { ...(lastBlockId && { id: { [Op.gt]: lastBlockId } }), creationTime: { [Op.gt]: creationTime }, ...(chainIds.length && { chainId: { [Op.in]: chainIds } }), }, - limit: 100, + limit: quantity, order: [['id', 'DESC']], }); @@ -774,6 +774,7 @@ export default class BlockDbRepository implements BlockRepository { chainIdsParam: string[], minimumDepth: number, startingTimestamp: number, + quantity: number, id?: string, ): Promise { const chainIds = chainIdsParam.length ? chainIdsParam.map(Number) : await this.getChainIds(); @@ -802,11 +803,11 @@ export default class BlockDbRepository implements BlockRepository { `; if (id) { - queryParams.push(id); + queryParams.push(id, quantity); query += ` - AND id > $${queryParams.length} + AND id > $${queryParams.length - 1} ORDER BY id DESC - LIMIT 100 + LIMIT $${queryParams.length} `; } else { query += ` diff --git a/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts b/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts index 852a1546..d8b1f4b9 100644 --- a/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts +++ b/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts @@ -511,11 +511,12 @@ export default class EventDbRepository implements EventRepository { qualifiedEventName, lastEventId, chainId, + quantity, minimumDepth, }: GetLastEventsParams) { const queryParams = []; let conditions = ''; - let limitCondition = lastEventId ? 'LIMIT 5' : 'LIMIT 100'; + let limitCondition = lastEventId ? 'LIMIT 5' : `LIMIT ${quantity}`; const splitted = qualifiedEventName.split('.'); const name = splitted.pop() ?? ''; @@ -556,9 +557,7 @@ export default class EventDbRepository implements EventRepository { const { rows } = await rootPgPool.query(query, queryParams); - const events = rows - .map(e => eventValidator.validate(e)) - .sort((a, b) => Number(b.id) - Number(a.id)); + const events = rows.map(e => eventValidator.validate(e)); return events; } diff --git a/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts index ecc7719c..c2d91c46 100644 --- a/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts @@ -26,21 +26,37 @@ import { buildEventOutput } from '../output/build-event-output'; * * @param context - Resolver context containing repositories and control signals * @param qualifiedEventName - The qualified name of events to monitor (module.name format) + * @param quantity - The number of events to fetch per poll * @param chainId - Optional chain ID to filter events by specific chain * @param minimumDepth - Optional minimum confirmation depth for events * @returns AsyncGenerator that yields arrays of new events as they are discovered */ -async function* iteratorFn( - context: ResolverContext, - qualifiedEventName: string, - chainId?: string | null, - minimumDepth?: number | null, -): AsyncGenerator { + +interface IteratorFnParams { + context: ResolverContext; + qualifiedEventName: string; + quantity: number; + chainId?: string | null; + minimumDepth?: number | null; +} + +async function* iteratorFn({ + context, + qualifiedEventName, + quantity, + chainId, + minimumDepth, +}: IteratorFnParams): AsyncGenerator { + if (quantity > 100) { + throw new Error('[ERROR][SUBSCRIPTION][PARAMS] Quantity must be less than 100.'); + } + let lastEventId = await context.eventRepository.getLastEventId(); while (context.signal) { const newEvents = await context.eventRepository.getLastEvents({ qualifiedEventName, lastEventId, + quantity, chainId, minimumDepth, }); @@ -65,8 +81,14 @@ async function* iteratorFn( * for chain ID and minimum confirmation depth. */ export const eventsSubscriptionResolver: SubscriptionResolvers['events'] = { + resolve: (payload: any) => payload, subscribe: (__root, args, context) => { - return iteratorFn(context, args.qualifiedEventName, args.chainId, args.minimumDepth); + return iteratorFn({ + context, + qualifiedEventName: args.qualifiedEventName, + quantity: args.quantity, + chainId: args.chainId, + minimumDepth: args.minimumDepth, + }); }, - resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/new-blocks-from-depth-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/new-blocks-from-depth-subscription-resolver.ts index f02d5f84..50f87c97 100644 --- a/indexer/src/kadena-server/resolvers/subscription/new-blocks-from-depth-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/new-blocks-from-depth-subscription-resolver.ts @@ -11,16 +11,45 @@ import { ResolverContext } from '../../config/apollo-server-config'; import { SubscriptionResolvers } from '../../config/graphql-types'; import { BlockOutput } from '../../repository/application/block-repository'; -async function* iteratorFn( - chainIds: string[], - minimumDepth: number, - context: ResolverContext, -): AsyncGenerator { +/** + * AsyncGenerator function that continuously polls for new blocks + * + * This function creates a polling loop that checks for new blocks at regular intervals. + * It keeps track of the last seen block ID to avoid sending duplicate blocks, and uses + * a starting timestamp to limit results to blocks created after the subscription started. + * + * The function maintains its state between iterations, allowing it to effectively paginate + * through new blocks as they are created on the blockchain. + * Interface for the parameters of the iterator function + * @param context - The context of the resolver + * @param chainIds - The chain IDs to filter the blocks by + * @param quantity - The number of blocks to fetch per poll + * @param minimumDepth - The minimum depth of the blocks to fetch + * @returns AsyncGenerator that yields arrays of new blocks as they are discovered + */ +interface IteratorFnParams { + context: ResolverContext; + chainIds: string[]; + quantity: number; + minimumDepth: number; +} + +async function* iteratorFn({ + context, + chainIds, + quantity, + minimumDepth, +}: IteratorFnParams): AsyncGenerator { + if (quantity > 100) { + throw new Error('[ERROR][SUBSCRIPTION][PARAMS] Quantity must be less than 100.'); + } + const startingTimestamp = new Date().getTime() / 1000000; const blockResult = await context.blockRepository.getLastBlocksWithDepth( chainIds, minimumDepth, startingTimestamp, + quantity, ); let lastBlockId: string | undefined; @@ -35,6 +64,7 @@ async function* iteratorFn( chainIds, minimumDepth, startingTimestamp, + quantity, lastBlockId, ); @@ -66,6 +96,11 @@ export const newBlocksFromDepthSubscriptionResolver: SubscriptionResolvers payload, subscribe: (_root, args, context) => { - return iteratorFn(args.chainIds ?? [], args.minimumDepth, context); + return iteratorFn({ + context, + chainIds: args.chainIds ?? [], + quantity: args.quantity, + minimumDepth: args.minimumDepth, + }); }, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts index bd0f40cf..002898aa 100644 --- a/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts @@ -21,14 +21,27 @@ import { buildBlockOutput } from '../output/build-block-output'; * The function maintains its state between iterations, allowing it to effectively paginate * through new blocks as they are created on the blockchain. * - * @param chainIds - Array of chain IDs to filter blocks by (empty array means all chains) * @param context - Resolver context containing repositories and control signals + * @param chainIds - Array of chain IDs to filter blocks by (empty array means all chains) + * @param quantity - The number of blocks to fetch per poll * @returns AsyncGenerator that yields arrays of new blocks as they are discovered */ -async function* iteratorFn( - chainIds: string[], - context: ResolverContext, -): AsyncGenerator { + +interface IteratorFnParams { + context: ResolverContext; + chainIds: string[]; + quantity: number; +} + +async function* iteratorFn({ + context, + chainIds, + quantity, +}: IteratorFnParams): AsyncGenerator { + if (quantity > 100) { + throw new Error('[ERROR][SUBSCRIPTION][PARAMS] Quantity must be less than 100.'); + } + const startingTimestamp = new Date().getTime() / 1000; let lastBlockId: number | undefined; @@ -38,6 +51,7 @@ async function* iteratorFn( creationTime: startingTimestamp, lastBlockId, chainIds, + quantity, }); if (newBlocks.length > 0) { @@ -59,8 +73,12 @@ async function* iteratorFn( * The resolver filters blocks by chain ID if specified, or returns blocks from all chains if no filter is provided. */ export const newBlocksSubscriptionResolver: SubscriptionResolvers['newBlocks'] = { + resolve: (payload: any) => payload, subscribe: (_root, args, context) => { - return iteratorFn(args.chainIds ?? [], context); + return iteratorFn({ + context, + chainIds: args.chainIds ?? [], + quantity: args.quantity, + }); }, - resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts index 395f69d6..37f30a8b 100644 --- a/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts @@ -22,16 +22,22 @@ import { buildTransactionOutput } from '../output/build-transaction-output'; * The function terminates after yielding the first result, effectively providing * a one-time notification when the transaction is added to the blockchain. * - * @param requestKey - The unique request key of the transaction to monitor * @param context - Resolver context containing repositories and control signals + * @param requestKey - The unique request key of the transaction to monitor * @param chainId - Optional chain ID to filter transactions by specific chain * @returns AsyncGenerator that yields the transaction when found */ -async function* iteratorFn( - requestKey: string, - context: ResolverContext, - chainId?: string | null, -): AsyncGenerator { +interface IteratorFnParams { + context: ResolverContext; + requestKey: string; + chainId?: string | null; +} + +async function* iteratorFn({ + requestKey, + context, + chainId, +}: IteratorFnParams): AsyncGenerator { while (context.signal) { const { edges } = await context.transactionRepository.getTransactions({ requestKey, @@ -66,8 +72,12 @@ async function* iteratorFn( */ export const transactionSubscriptionResolver: SubscriptionResolvers['transaction'] = { + resolve: (payload: any) => payload, subscribe: (_root, args, context) => { - return iteratorFn(args.requestKey, context, args.chainId); + return iteratorFn({ + context, + requestKey: args.requestKey, + chainId: args.chainId, + }); }, - resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/transactions-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/transactions-subscription-resolver.ts index 719348d3..573aa298 100644 --- a/indexer/src/kadena-server/resolvers/subscription/transactions-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/transactions-subscription-resolver.ts @@ -20,16 +20,15 @@ import { TransactionOutput } from '@/kadena-server/repository/application/transa * sending duplicate transactions to subscribers. * * @param context - Resolver context containing repositories and control signals - * @param chainId - Optional chain ID to filter transactions by specific chain - * @param minimumDepth - Optional minimum confirmation depth for transactions + * @param quantity - The number of transactions to fetch per poll * @returns AsyncGenerator that yields arrays of new transactions as they are discovered */ async function* iteratorFn( - quantity: number, context: ResolverContext, + quantity: number, ): AsyncGenerator { if (quantity > 100) { - throw new Error('Quantity must be less than 100.'); + throw new Error('[ERROR][SUBSCRIPTION][PARAMS] Quantity must be less than 100.'); } let hasError = false; @@ -45,7 +44,7 @@ async function* iteratorFn( } } catch (error) { hasError = true; - console.error('Error getting last transactions:', error); + console.error('[ERROR][DB][DATA_CORRUPT] Error getting last transactions:', error); } } @@ -60,8 +59,8 @@ async function* iteratorFn( */ export const transactionsSubscriptionResolver: SubscriptionResolvers['transactions'] = { + resolve: (payload: any) => payload, subscribe: (__root, args, context) => { - return iteratorFn(args.quantity, context); + return iteratorFn(context, args.quantity); }, - resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/server.ts b/indexer/src/kadena-server/server.ts index 7d58c6f0..bb8831e8 100644 --- a/indexer/src/kadena-server/server.ts +++ b/indexer/src/kadena-server/server.ts @@ -32,7 +32,15 @@ import { createGraphqlContext, ResolverContext } from './config/apollo-server-co import { WebSocketServer } from 'ws'; import { useServer } from 'graphql-ws/lib/use/ws'; import { makeExecutableSchema } from '@graphql-tools/schema'; -import { ArgumentNode, ASTNode, GraphQLError, Kind } from 'graphql'; +import { + ArgumentNode, + ASTNode, + GraphQLError, + Kind, + specifiedRules, + parse, + validate, +} from 'graphql'; import initCache from '../cache/init'; import { getRequiredEnvString, getRequiredArrayEnvString } from '../utils/helpers'; @@ -259,12 +267,7 @@ export async function startGraphqlServer() { introspection: true, validationRules: [ depthLimit({ - maxDepth: 15, // Reasonable depth for most queries - maxListDepth: 8, // Prevent deeply nested array queries - maxSelfReferentialDepth: 3, // Limit recursive queries - maxIntrospectionDepth: 15, // Limit introspection query depth - maxIntrospectionListDepth: 8, // Limit introspection array depth - maxIntrospectionSelfReferentialDepth: 3, + maxDepth: 9, revealDetails: false, // Don't expose limits to clients }), ], @@ -300,31 +303,16 @@ export async function startGraphqlServer() { * More documentation can be found at https://github.com/ivome/graphql-query-complexity */ const complexity = getComplexity({ - // GraphQL schema schema, - // To calculate query complexity properly, - // check only the requested operation - // not the whole document that may contains multiple operations operationName: request.operationName, - // GraphQL query document query: document, - // GraphQL query variables variables: request.variables, - // Add any number of estimators. The estimators are invoked in order, the first - // numeric value that is being returned by an estimator is used as the field complexity - // If no estimator returns a value, an exception is raised estimators: [ - // Using fieldExtensionsEstimator is mandatory to make it work with type-graphql fieldExtensionsEstimator(), - // Add directive support directiveEstimator({ - // Optionally change the name of the directive here... Default value is `complexity` name: 'complexity', }), - // Add more estimators here... - // This will assign each field a complexity of 1 - // if no other estimator returned a value - simpleEstimator({ defaultComplexity: 1 }), + simpleEstimator({ defaultComplexity: 0 }), ], }); @@ -332,7 +320,7 @@ export async function startGraphqlServer() { // like compare it with max and throw error when the threshold is reached if (complexity > MAX_COMPLEXITY) { throw new Error( - `Sorry, too complicated query! Exceeded the maximum allowed complexity.`, + 'Sorry, too complicated query! Exceeded the maximum allowed complexity.', ); } }, @@ -389,6 +377,35 @@ export async function startGraphqlServer() { signal: abortController.signal, // Pass signal per subscription }; }, + onSubscribe: (_ctx, msg) => { + const { operationName, query, variables } = msg.payload; + + const document = parse(query); + const errors = validate(schema, document, specifiedRules); + if (errors.length > 0) return errors; + + const complexity = getComplexity({ + schema, + operationName: operationName ?? undefined, + query: document, + variables: variables ?? {}, + estimators: [ + fieldExtensionsEstimator(), + directiveEstimator({ + name: 'complexity', + }), + simpleEstimator({ defaultComplexity: 0 }), + ], + }); + + if (complexity > MAX_COMPLEXITY) { + return [ + new GraphQLError( + 'Sorry, too complicated query! Exceeded the maximum allowed complexity.', + ), + ]; + } + }, // Add connection lifecycle hooks for tracking onConnect: ctx => { const ip = ctx.extra.request.socket.remoteAddress || 'unknown'; @@ -396,7 +413,7 @@ export async function startGraphqlServer() { console.log('New connection -> ', ip, 'Total connections opened:', activeConnections); return true; // Allow the connection }, - onDisconnect: (ctx, code, reason) => { + onDisconnect: ctx => { const ip = ctx.extra.request.socket.remoteAddress || 'unknown'; activeConnections--; console.log('Closed connection -> ', ip, 'Total connections opened:', activeConnections);