diff --git a/indexer/src/index.ts b/indexer/src/index.ts index 239a96a8..bd355e8a 100644 --- a/indexer/src/index.ts +++ b/indexer/src/index.ts @@ -9,12 +9,15 @@ import { useKadenaGraphqlServer } from "./kadena-server/server"; import { closeDatabase } from "./config/database"; import { initializeDatabase } from "./config/init"; import { startGuardsBackfill } from "./services/sync/guards"; +import { startBackfillCoinbaseTransactions } from "./services/sync/coinbase"; program .option("-s, --streaming", "Start streaming blockchain data") .option("-g, --oldGraphql", "Start GraphQL server based on Postgraphile") .option("-t, --graphql", "Start GraphQL server based on kadena schema") .option("-f, --guards", "Backfill the guards") + // this option shouldn't be used if you initialize the indexer from the beginning + .option("-c, --coinbase", "Backfill coinbase transactions") .option("-z, --database", "Init the database"); program.parse(process.argv); @@ -37,6 +40,8 @@ async function main() { await startStreaming(); } else if (options.guards) { await startGuardsBackfill(); + } else if (options.coinbase) { + await startBackfillCoinbaseTransactions(); } else if (options.oldGraphql) { await usePostgraphile(); } else if (options.graphql) { diff --git a/indexer/src/kadena-server/config/apollo-server-config.ts b/indexer/src/kadena-server/config/apollo-server-config.ts index 54a287e9..c675583e 100644 --- a/indexer/src/kadena-server/config/apollo-server-config.ts +++ b/indexer/src/kadena-server/config/apollo-server-config.ts @@ -40,6 +40,7 @@ export type ResolverContext = { mempoolGateway: MempoolGateway; pactGateway: PactGateway; pubSub: PubSub; + signal: AbortSignal; }; export const createGraphqlContext = () => { @@ -56,6 +57,7 @@ export const createGraphqlContext = () => { mempoolGateway: new MempoolApiGateway(), pactGateway: new PactApiGateway(), pubSub: publishSubscribe, + signal: new AbortController().signal, }; return Promise.resolve({ diff --git a/indexer/src/kadena-server/config/graphql-types.ts b/indexer/src/kadena-server/config/graphql-types.ts index cb792746..671bb249 100644 --- a/indexer/src/kadena-server/config/graphql-types.ts +++ b/indexer/src/kadena-server/config/graphql-types.ts @@ -472,8 +472,13 @@ export type Query = { fungibleAccount?: Maybe; /** Retrieve an account by public key. */ fungibleAccountsByPublicKey: Array; + /** + * Retrieve an account by its name and fungible, such as coin, on a specific chain. + * @deprecated deprecated, use Query.fungibleChainAccounts + */ + fungibleChainAccount?: Maybe; /** Retrieve an account by its name and fungible, such as coin, on a specific chain. */ - fungibleChainAccounts: Array; + fungibleChainAccounts?: Maybe>; /** Retrieve a chain account by public key. */ fungibleChainAccountsByPublicKey: Array; /** @@ -585,6 +590,13 @@ export type QueryFungibleAccountsByPublicKeyArgs = { }; +export type QueryFungibleChainAccountArgs = { + accountName: Scalars['String']['input']; + chainId: Scalars['String']['input']; + fungibleName?: InputMaybe; +}; + + export type QueryFungibleChainAccountsArgs = { accountName: Scalars['String']['input']; chainIds?: InputMaybe>; @@ -762,6 +774,16 @@ export type QueryTransfersConnectionEdge = { node: Transfer; }; +/** DEPRECATED: a fallthrough IGuard type to cover non-KeysetGuard types. */ +export type RawGuard = IGuard & { + __typename?: 'RawGuard'; + /** @deprecated deprecated, use KeysetGuard.keys */ + keys: Array; + /** @deprecated deprecated, use KeysetGuard.predicate */ + predicate: Scalars['String']['output']; + raw: Scalars['String']['output']; +}; + /** A signer for a specific transaction. */ export type Signer = Node & { __typename?: 'Signer'; @@ -884,6 +906,11 @@ export type TransactionResult = { gas: Scalars['BigInt']['output']; /** The transaction result when it was successful. Formatted as raw JSON. */ goodResult?: Maybe; + /** + * The height of the block this transaction belongs to. + * @deprecated Use `block.height` instead. + */ + height: Scalars['BigInt']['output']; /** Identifier to retrieve the logs for the execution of the transaction. */ logs?: Maybe; /** @deprecated Not used. */ @@ -968,6 +995,17 @@ export type Transfer = Node & { transaction?: Maybe; }; +export type UserGuard = IGuard & { + __typename?: 'UserGuard'; + args: Array; + fun: Scalars['String']['output']; + /** @deprecated deprecated, use KeysetGuard.keys */ + keys: Array; + /** @deprecated deprecated, use KeysetGuard.predicate */ + predicate: Scalars['String']['output']; + raw: Scalars['String']['output']; +}; + export type ResolverTypeWrapper = Promise | T; @@ -1043,7 +1081,7 @@ export type ResolversUnionTypes<_RefType extends Record> = { /** Mapping of interface types */ export type ResolversInterfaceTypes<_RefType extends Record> = { - IGuard: ( KeysetGuard ); + IGuard: ( KeysetGuard ) | ( RawGuard ) | ( UserGuard ); Node: ( Omit & { events: _RefType['BlockEventsConnection'], minerAccount: _RefType['FungibleChainAccount'], parent?: Maybe<_RefType['Block']>, transactions: _RefType['BlockTransactionsConnection'] } ) | ( Omit & { block: _RefType['Block'], transaction?: Maybe<_RefType['Transaction']> } ) | ( Omit & { chainAccounts: Array<_RefType['FungibleChainAccount']>, transactions: _RefType['FungibleAccountTransactionsConnection'], transfers: _RefType['FungibleAccountTransfersConnection'] } ) | ( Omit & { guard: _RefType['IGuard'], transactions: _RefType['FungibleChainAccountTransactionsConnection'], transfers: _RefType['FungibleChainAccountTransfersConnection'] } ) | ( Omit & { chainAccounts: Array<_RefType['NonFungibleChainAccount']>, nonFungibleTokenBalances: Array<_RefType['NonFungibleTokenBalance']>, transactions: _RefType['NonFungibleAccountTransactionsConnection'] } ) | ( Omit & { nonFungibleTokenBalances: Array<_RefType['NonFungibleTokenBalance']>, transactions: _RefType['NonFungibleChainAccountTransactionsConnection'] } ) | ( Omit & { guard: _RefType['IGuard'] } ) | ( Signer ) | ( Omit & { cmd: _RefType['TransactionCommand'], orphanedTransactions?: Maybe>>, result: _RefType['TransactionInfo'] } ) | ( Omit & { block: _RefType['Block'], crossChainTransfer?: Maybe<_RefType['Transfer']>, transaction?: Maybe<_RefType['Transaction']> } ); }; @@ -1109,6 +1147,7 @@ export type ResolversTypes = { QueryTransactionsConnectionEdge: ResolverTypeWrapper & { node: ResolversTypes['Transaction'] }>; QueryTransfersConnection: ResolverTypeWrapper & { edges: Array }>; QueryTransfersConnectionEdge: ResolverTypeWrapper & { node: ResolversTypes['Transfer'] }>; + RawGuard: ResolverTypeWrapper; Signer: ResolverTypeWrapper; String: ResolverTypeWrapper; Subscription: ResolverTypeWrapper<{}>; @@ -1126,6 +1165,7 @@ export type ResolversTypes = { TransactionResultTransfersConnectionEdge: ResolverTypeWrapper & { node: ResolversTypes['Transfer'] }>; TransactionSignature: ResolverTypeWrapper; Transfer: ResolverTypeWrapper & { block: ResolversTypes['Block'], crossChainTransfer?: Maybe, transaction?: Maybe }>; + UserGuard: ResolverTypeWrapper; }; /** Mapping between all available schema types and the resolvers parents */ @@ -1190,6 +1230,7 @@ export type ResolversParentTypes = { QueryTransactionsConnectionEdge: Omit & { node: ResolversParentTypes['Transaction'] }; QueryTransfersConnection: Omit & { edges: Array }; QueryTransfersConnectionEdge: Omit & { node: ResolversParentTypes['Transfer'] }; + RawGuard: RawGuard; Signer: Signer; String: Scalars['String']['output']; Subscription: {}; @@ -1207,6 +1248,7 @@ export type ResolversParentTypes = { TransactionResultTransfersConnectionEdge: Omit & { node: ResolversParentTypes['Transfer'] }; TransactionSignature: TransactionSignature; Transfer: Omit & { block: ResolversParentTypes['Block'], crossChainTransfer?: Maybe, transaction?: Maybe }; + UserGuard: UserGuard; }; export interface BigIntScalarConfig extends GraphQLScalarTypeConfig { @@ -1403,7 +1445,7 @@ export type GraphConfigurationResolvers = { - __resolveType: TypeResolveFn<'KeysetGuard', ParentType, ContextType>; + __resolveType: TypeResolveFn<'KeysetGuard' | 'RawGuard' | 'UserGuard', ParentType, ContextType>; keys?: Resolver, ParentType, ContextType>; predicate?: Resolver; raw?: Resolver; @@ -1527,7 +1569,8 @@ export type QueryResolvers>; fungibleAccount?: Resolver, ParentType, ContextType, RequireFields>; fungibleAccountsByPublicKey?: Resolver, ParentType, ContextType, RequireFields>; - fungibleChainAccounts?: Resolver, ParentType, ContextType, RequireFields>; + fungibleChainAccount?: Resolver, ParentType, ContextType, RequireFields>; + fungibleChainAccounts?: Resolver>, ParentType, ContextType, RequireFields>; fungibleChainAccountsByPublicKey?: Resolver, ParentType, ContextType, RequireFields>; gasLimitEstimate?: Resolver, ParentType, ContextType, RequireFields>; graphConfiguration?: Resolver; @@ -1632,6 +1675,13 @@ export type QueryTransfersConnectionEdgeResolvers; }; +export type RawGuardResolvers = { + keys?: Resolver, ParentType, ContextType>; + predicate?: Resolver; + raw?: Resolver; + __isTypeOf?: IsTypeOfResolverFn; +}; + export type SignerResolvers = { address?: Resolver, ParentType, ContextType>; clist?: Resolver, ParentType, ContextType>; @@ -1705,6 +1755,7 @@ export type TransactionResultResolvers>; gas?: Resolver; goodResult?: Resolver, ParentType, ContextType>; + height?: Resolver; logs?: Resolver, ParentType, ContextType>; metadata?: Resolver; transactionId?: Resolver, ParentType, ContextType>; @@ -1762,6 +1813,15 @@ export type TransferResolvers; }; +export type UserGuardResolvers = { + args?: Resolver, ParentType, ContextType>; + fun?: Resolver; + keys?: Resolver, ParentType, ContextType>; + predicate?: Resolver; + raw?: Resolver; + __isTypeOf?: IsTypeOfResolverFn; +}; + export type Resolvers = { BigInt?: GraphQLScalarType; Block?: BlockResolvers; @@ -1817,6 +1877,7 @@ export type Resolvers = { QueryTransactionsConnectionEdge?: QueryTransactionsConnectionEdgeResolvers; QueryTransfersConnection?: QueryTransfersConnectionResolvers; QueryTransfersConnectionEdge?: QueryTransfersConnectionEdgeResolvers; + RawGuard?: RawGuardResolvers; Signer?: SignerResolvers; Subscription?: SubscriptionResolvers; Transaction?: TransactionResolvers; @@ -1833,5 +1894,6 @@ export type Resolvers = { TransactionResultTransfersConnectionEdge?: TransactionResultTransfersConnectionEdgeResolvers; TransactionSignature?: TransactionSignatureResolvers; Transfer?: TransferResolvers; + UserGuard?: UserGuardResolvers; }; diff --git a/indexer/src/kadena-server/config/schema.graphql b/indexer/src/kadena-server/config/schema.graphql index 9e32a132..e2a04550 100644 --- a/indexer/src/kadena-server/config/schema.graphql +++ b/indexer/src/kadena-server/config/schema.graphql @@ -135,6 +135,16 @@ type Query { publicKey: String! ): [FungibleAccount!]! + """ + Retrieve an account by its name and fungible, such as coin, on a specific chain. + """ + fungibleChainAccount( + accountName: String! + chainId: String! + fungibleName: String = "coin" + ): FungibleChainAccount + @deprecated(reason: "deprecated, use Query.fungibleChainAccounts") + """ Retrieve an account by its name and fungible, such as coin, on a specific chain. """ @@ -142,7 +152,7 @@ type Query { accountName: String! chainIds: [String!] fungibleName: String = "coin" - ): [FungibleChainAccount!]! + ): [FungibleChainAccount!] """ Retrieve a chain account by public key. @@ -732,6 +742,11 @@ type TransactionResult { logs: String transactionId: BigInt + """ + The height of the block this transaction belongs to. + """ + height: BigInt! @deprecated(reason: "Use `block.height` instead.") + metadata: String! @deprecated(reason: "Not used.") block: Block! @@ -1021,3 +1036,22 @@ type KeysetGuard implements IGuard { predicate: String! raw: String! } + +type UserGuard implements IGuard { + args: [String!]! + fun: String! + keys: [String!]! @deprecated(reason: "deprecated, use KeysetGuard.keys") + predicate: String! + @deprecated(reason: "deprecated, use KeysetGuard.predicate") + raw: String! +} + +""" +DEPRECATED: a fallthrough IGuard type to cover non-KeysetGuard types. +""" +type RawGuard implements IGuard { + keys: [String!]! @deprecated(reason: "deprecated, use KeysetGuard.keys") + predicate: String! + @deprecated(reason: "deprecated, use KeysetGuard.predicate") + raw: String! +} diff --git a/indexer/src/kadena-server/domain/transaction/transaction-service.ts b/indexer/src/kadena-server/domain/transaction/transaction-service.ts deleted file mode 100644 index 33a287d3..00000000 --- a/indexer/src/kadena-server/domain/transaction/transaction-service.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { TransactionOutput } from "../../repository/application/transaction-repository"; - -const BLOCK_DEPTH = 6; - -export const getBaselineAndOrphanedTransactions = ( - transactions: TransactionOutput[], - maxHeight: number, -): { - baselineTransaction: TransactionOutput | null; - orphanedTransactions: TransactionOutput[]; -} => { - if (transactions.length === 0) { - return { baselineTransaction: null, orphanedTransactions: [] }; - } - - const baselineTransaction = transactions.filter( - (transaction) => maxHeight - transaction.blockHeight > BLOCK_DEPTH, - ); - - const orphanedTransactions = transactions.filter( - (transaction) => maxHeight - transaction.blockHeight <= BLOCK_DEPTH, - ); - - if (baselineTransaction.length > 1) { - return { - baselineTransaction: baselineTransaction[0], - orphanedTransactions: [ - ...baselineTransaction.slice(1), - ...orphanedTransactions, - ], - }; - } - - if (!baselineTransaction) { - const [first, ...rest] = orphanedTransactions; - return { baselineTransaction: first, orphanedTransactions: rest }; - } - - return { baselineTransaction: baselineTransaction[0], orphanedTransactions }; -}; diff --git a/indexer/src/kadena-server/repository/application/block-repository.ts b/indexer/src/kadena-server/repository/application/block-repository.ts index 4b595941..600a5b62 100644 --- a/indexer/src/kadena-server/repository/application/block-repository.ts +++ b/indexer/src/kadena-server/repository/application/block-repository.ts @@ -6,6 +6,7 @@ import { } from "../../config/graphql-types"; import { PaginationsParams } from "../pagination"; import { ConnectionEdge } from "../types"; +import { TransactionOutput } from "./transaction-repository"; export interface GetBlocksFromDepthParams extends PaginationsParams { chainIds?: InputMaybe; @@ -24,6 +25,12 @@ export interface GetBlocksBetweenHeightsParams extends PaginationsParams { endHeight?: InputMaybe; } +export interface GetLatestBlocksParams { + creationTime: number; + lastBlockId?: number; + chainIds?: string[]; +} + export type BlockOutput = Omit< Block, "parent" | "events" | "minerAccount" | "transactions" @@ -61,6 +68,12 @@ export default interface BlockRepository { getTotalCountOfBlockEvents(blockHash: string): Promise; + getLatestBlocks(params: GetLatestBlocksParams): Promise; + + getTransactionsOrderedByBlockDepth( + transactions: TransactionOutput[], + ): Promise; + // dataloader getBlocksByEventIds(eventIds: string[]): Promise; getBlocksByTransactionIds(transactionIds: string[]): Promise; diff --git a/indexer/src/kadena-server/repository/application/event-repository.ts b/indexer/src/kadena-server/repository/application/event-repository.ts index 236d3aa2..66f7fae3 100644 --- a/indexer/src/kadena-server/repository/application/event-repository.ts +++ b/indexer/src/kadena-server/repository/application/event-repository.ts @@ -35,6 +35,13 @@ export interface GetEventParams { requestKey: string; } +export interface GetLastEventsParams { + qualifiedEventName: string; + lastEventId?: number; + chainId?: string | null; + minimumDepth?: number | null; +} + export default interface EventRepository { getEvent(params: GetEventParams): Promise; getBlockEvents(params: GetBlockEventsParams): Promise<{ @@ -51,7 +58,9 @@ export default interface EventRepository { }>; getTotalEventsCount(hash: GetTotalEventsCount): Promise; getTotalTransactionEventsCount( - hash: GetTotalTransactionEventsCount + hash: GetTotalTransactionEventsCount, ): Promise; getTotalCountOfBlockEvents(hash: string): Promise; + getLastEventId(): Promise; + getLastEvents(params: GetLastEventsParams): Promise; } diff --git a/indexer/src/kadena-server/repository/infra/repository/balance-db-repository.ts b/indexer/src/kadena-server/repository/infra/repository/balance-db-repository.ts index 9495be9d..33acfed9 100644 --- a/indexer/src/kadena-server/repository/infra/repository/balance-db-repository.ts +++ b/indexer/src/kadena-server/repository/infra/repository/balance-db-repository.ts @@ -21,6 +21,7 @@ import { fungibleChainAccountValidator } from "../schema-validator/fungible-chai import { nonFungibleTokenBalanceValidator } from "../schema-validator/non-fungible-token-balance-validator"; export default class BalanceDbRepository implements BalanceRepository { + // TODO: waiting for orphan blocks mechanism to be ready async getAccountInfo(accountName: string, fungibleName = "coin") { const account = await BalanceModel.findOne({ where: { @@ -51,6 +52,7 @@ export default class BalanceDbRepository implements BalanceRepository { return { ...accountInfo, totalBalance }; } + // TODO: waiting for orphan blocks mechanism to be ready async getChainsAccountInfo( accountName: string, fungibleName: string, @@ -76,6 +78,7 @@ export default class BalanceDbRepository implements BalanceRepository { return output; } + // TODO: waiting for orphan blocks mechanism to be ready async getAccountsByPublicKey( publicKey: string, fungibleName: string, @@ -120,6 +123,7 @@ export default class BalanceDbRepository implements BalanceRepository { return output; } + // TODO: waiting for orphan blocks mechanism to be ready async getChainAccountsByPublicKey( publicKey: string, fungibleName: string, @@ -301,11 +305,11 @@ export default class BalanceDbRepository implements BalanceRepository { let chainIdsParam = []; if (!chainIds?.length) { const query = ` - SELECT DISTINCT b."chainId" - FROM "Balances" b - WHERE b.account = $1 - AND b.module = $2 - `; + SELECT DISTINCT b."chainId" + FROM "Balances" b + WHERE b.account = $1 + AND b.module = $2 + `; const { rows } = await rootPgPool.query(query, [ accountName, fungibleName, diff --git a/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts b/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts index 7c97480c..b82d547b 100644 --- a/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts +++ b/indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts @@ -6,6 +6,7 @@ import BlockRepository, { GetBlocksBetweenHeightsParams, GetBlocksFromDepthParams, GetCompletedBlocksParams, + GetLatestBlocksParams, } from "../../application/block-repository"; import { getPageInfo, getPaginationParams } from "../../pagination"; import { blockValidator } from "../schema-validator/block-schema-validator"; @@ -15,6 +16,7 @@ import { formatGuard_NODE } from "../../../../utils/chainweb-node"; import { MEMORY_CACHE } from "../../../../cache/init"; import { NODE_INFO_KEY } from "../../../../cache/keys"; import { GetNodeInfo } from "../../application/network-repository"; +import { TransactionOutput } from "../../application/transaction-repository"; export default class BlockDbRepository implements BlockRepository { async getBlockByHash(hash: string) { @@ -86,23 +88,55 @@ export default class BlockDbRepository implements BlockRepository { last, }); - const query: FindOptions = { - where: { - height: { [Op.gte]: startHeight }, - ...(endHeight && { height: { [Op.lte]: endHeight } }), - ...(after && { id: { [Op.lt]: after } }), - ...(before && { id: { [Op.gt]: before } }), - ...(!!chainIds?.length && { chainId: { [Op.in]: chainIds } }), - }, - limit, - order: [["id", order]], - }; + const queryParams: (string | number | string[])[] = [limit, startHeight]; + let conditions = ""; - const rows = await BlockModel.findAll(query); + if (before) { + queryParams.push(before); + conditions += `\nAND b.id > $${queryParams.length}`; + } - const edges = rows.map((row) => ({ + if (after) { + queryParams.push(after); + conditions += `\nAND b.id < $${queryParams.length}`; + } + + if (chainIds?.length) { + queryParams.push(chainIds); + conditions += `\nAND b."chainId" = $${queryParams.length}`; + } + + if (endHeight) { + queryParams.push(endHeight); + conditions += `\nAND b."height" <= $${queryParams.length}`; + } + + const query = ` + SELECT b.id, + b.hash, + b."chainId", + b."creationTime", + b."epochStart", + b."featureFlags", + b.height as "height", + b.nonce as "nonce", + b."payloadHash" as "payloadHash", + b.weight as "weight", + b.target as "target", + b.adjacents as "adjacents", + b.parent as "parent" + FROM "Blocks" b + WHERE b.height >= $2 + ${conditions} + ORDER BY b.id ${order} + LIMIT $1; + `; + + const { rows: blockRows } = await rootPgPool.query(query, queryParams); + + const edges = blockRows.map((row) => ({ cursor: row.id.toString(), - node: blockValidator.mapFromSequelize(row), + node: blockValidator.validate(row), })); const pageInfo = getPageInfo({ edges, order, limit, after, before }); @@ -421,4 +455,54 @@ export default class BlockDbRepository implements BlockRepository { return block?.transactionsCount || 0; } + + async getLatestBlocks(params: GetLatestBlocksParams): Promise { + const { creationTime, lastBlockId, chainIds = [] } = params; + const blocks = await BlockModel.findAll({ + where: { + ...(lastBlockId && { id: { [Op.gt]: lastBlockId } }), + creationTime: { [Op.gt]: creationTime }, + ...(chainIds.length && { chainId: { [Op.in]: chainIds } }), + }, + limit: 100, + order: [["id", "DESC"]], + }); + + const output = blocks.map((b) => blockValidator.mapFromSequelize(b)); + return output; + } + + async getTransactionsOrderedByBlockDepth( + transactions: TransactionOutput[], + ): Promise { + const query = ` + WITH RECURSIVE BlockDescendants AS ( + SELECT hash, parent, hash AS root_hash, 0 AS depth, height, "chainId" + FROM "Blocks" + WHERE hash = ANY($1::text[]) + UNION ALL + SELECT b.hash, b.parent, d.root_hash, d.depth + 1 AS depth, b.height, b."chainId" + FROM BlockDescendants d + JOIN "Blocks" b ON d.hash = b.parent + AND b.height = d.height + 1 + AND b."chainId" = d."chainId" + WHERE d.depth <= 6 + ) + SELECT root_hash, MAX(depth) AS depth + FROM BlockDescendants + GROUP BY root_hash; + `; + + const { rows } = await rootPgPool.query(query, [ + transactions.map((t) => t.blockHash), + ]); + + rows.sort((a, b) => b.depth - a.depth); + + const output = rows.map((r) => + transactions.find((t) => t.blockHash === r.root_hash), + ) as any; + + return output; + } } diff --git a/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts b/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts index 364e6310..16994663 100644 --- a/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts +++ b/indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts @@ -5,6 +5,7 @@ import EventRepository, { GetBlockEventsParams, GetEventParams, GetEventsParams, + GetLastEventsParams, GetTotalEventsCount, GetTotalTransactionEventsCount, GetTransactionEventsParams, @@ -370,4 +371,67 @@ export default class EventDbRepository implements EventRepository { const totalCount = parseInt(countResult[0].count, 10); return totalCount; } + + async getLastEventId(): Promise { + const query = `SELECT last_value AS lastValue from "Events_id_seq"`; + const { rows } = await rootPgPool.query(query); + const totalCount = parseInt(rows[0].lastValue, 10); + return totalCount; + } + + async getLastEvents({ + qualifiedEventName, + lastEventId, + chainId, + minimumDepth, + }: GetLastEventsParams) { + const queryParams = []; + let conditions = ""; + let limitCondition = lastEventId ? "LIMIT 5" : "LIMIT 100"; + + const splitted = qualifiedEventName.split("."); + const name = splitted.pop() ?? ""; + const module = splitted.join("."); + + queryParams.push(module); + conditions += `WHERE e.module = $${queryParams.length}`; + queryParams.push(name); + conditions += `\nAND e.name = $${queryParams.length}`; + + if (lastEventId) { + queryParams.push(lastEventId); + conditions += `\nAND e.id > $${queryParams.length}`; + } + + if (chainId) { + queryParams.push(parseInt(chainId)); + conditions += `\nAND b."chainId" = $${queryParams.length}`; + } + + const query = ` + SELECT e.id as id, + e.requestkey as "requestKey", + b."chainId" as "chainId", + b.height as height, + e.module as "moduleName", + e."orderIndex" as "orderIndex", + e.name as name, + e.params as parameters, + b.hash as "blockHash" + FROM "Events" e + JOIN "Transactions" t ON e."transactionId" = t.id + JOIN "Blocks" b ON b.id = t."blockId" + ${conditions} + ORDER BY e.id DESC + ${limitCondition} + `; + + const { rows } = await rootPgPool.query(query, queryParams); + + const events = rows + .map((e) => eventValidator.validate(e)) + .sort((a, b) => Number(b.id) - Number(a.id)); + + return events; + } } diff --git a/indexer/src/kadena-server/repository/infra/schema-validator/fungible-chain-account-validator.ts b/indexer/src/kadena-server/repository/infra/schema-validator/fungible-chain-account-validator.ts index d1031756..de2cc15e 100644 --- a/indexer/src/kadena-server/repository/infra/schema-validator/fungible-chain-account-validator.ts +++ b/indexer/src/kadena-server/repository/infra/schema-validator/fungible-chain-account-validator.ts @@ -27,14 +27,7 @@ const validate = (row: any): FungibleChainAccountOutput => { fungibleName: res.module, chainId: res.chainId.toString(), balance: Number(res.balance), - guard: { - keys: row.guard.keys, - predicate: row.guard.predicate, - raw: JSON.stringify({ - keys: row.guard.keys, - predicate: row.guard.predicate, - }), - }, + guard: row.guard, }; }; diff --git a/indexer/src/kadena-server/repository/infra/schema-validator/transaction-schema-validator.ts b/indexer/src/kadena-server/repository/infra/schema-validator/transaction-schema-validator.ts index a10225dd..c9d6ef1d 100644 --- a/indexer/src/kadena-server/repository/infra/schema-validator/transaction-schema-validator.ts +++ b/indexer/src/kadena-server/repository/infra/schema-validator/transaction-schema-validator.ts @@ -52,6 +52,7 @@ function validate(row: any): TransactionOutput { continuation: continuation === "{}" ? null : continuation, eventCount: res.eventCount, transactionId: res.txid ? res.txid : null, + height: res.height, gas: res.gas, goodResult: isSuccess ? JSON.stringify(res.result.data) : null, diff --git a/indexer/src/kadena-server/resolvers/fields/block/pow-hash-block-resolver.ts b/indexer/src/kadena-server/resolvers/fields/block/pow-hash-block-resolver.ts new file mode 100644 index 00000000..fe1f766d --- /dev/null +++ b/indexer/src/kadena-server/resolvers/fields/block/pow-hash-block-resolver.ts @@ -0,0 +1,39 @@ +import { getRequiredEnvString } from "../../../../utils/helpers"; +import { ResolverContext } from "../../../config/apollo-server-config"; +import { BlockResolvers } from "../../../config/graphql-types"; +import crypto from "crypto"; + +const SYNC_BASE_URL = getRequiredEnvString("SYNC_BASE_URL"); +const NETWORK_ID = getRequiredEnvString("SYNC_NETWORK"); + +function base64UrlToBase64(base64url: any) { + // Convert Base64 URL format to standard Base64 + return ( + base64url.replace(/-/g, "+").replace(/_/g, "/") + + "=".repeat((4 - (base64url.length % 4)) % 4) + ); +} + +async function hashWithBlake2s(input: any) { + const normalizedBase64 = base64UrlToBase64(input); + const buffer = Buffer.from(normalizedBase64, "base64"); + const truncatedBuffer = buffer.subarray(0, -32); + const hash = crypto.createHash("blake2s256").update(truncatedBuffer).digest(); + return Buffer.from(hash).reverse().toString("hex"); +} + +export const powHashBlockResolver: BlockResolvers["powHash"] = + async (parent) => { + console.log("powHashBlockResolver"); + + const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${parent.chainId}/header/${parent.hash}`; + const res = await fetch(url, { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }); + + const output = await res.json(); + return hashWithBlake2s(output); + }; diff --git a/indexer/src/kadena-server/resolvers/index.ts b/indexer/src/kadena-server/resolvers/index.ts index c415f0c9..be132e7a 100644 --- a/indexer/src/kadena-server/resolvers/index.ts +++ b/indexer/src/kadena-server/resolvers/index.ts @@ -29,7 +29,7 @@ import { lastBlockHeightQueryResolver } from "./query/last-block-height-query-re import { networkInfoQueryResolver } from "./query/network-info-query-resolver"; import { chainAccountsFungibleAccountResolver } from "./fields/fungible-account/chain-accounts-fungible-account-resolver"; import { nodeQueryResolver } from "./query/node-query-resolver"; -import { fungibleChainAccountsQueryResolver } from "./query/fungible-chain-account-query-resolver"; +import { fungibleChainAccountsQueryResolver } from "./query/fungible-chain-accounts-query-resolver"; import { transactionsFungibleChainAccountResolver } from "./fields/fungible-chain-account/transactions-fungible-chain-account-resolver"; import { transfersFungibleChainAccountResolver } from "./fields/fungible-chain-account/transfers-fungible-chain-account-resolver"; import { totalCountFungibleAccountTransfersConnectionResolver } from "./fields/fungible-account/transfers-connection/total-count-fungible-account-transfers-connection-resolver"; @@ -62,6 +62,8 @@ import { nonFungibleChainAccountQueryResolver } from "./query/non-fungible-chain import { transactionsNonFungibleChainAccountResolver } from "./fields/non-fungible-chain-account/transactions-non-fungible-chain-account"; import { totalCountNonFungibleAccountTransactionsConnectionResolver } from "./fields/non-fungible-account/transactions-connection/total-count-non-fungible-account-transactions-connection-resolver"; import { totalCountNonFungibleChainAccountTransactionsConnectionResolver } from "./fields/non-fungible-chain-account/transactions-connection/total-count-non-fungible-chain-account-transactions-connection-resolver"; +import { fungibleChainAccountQueryResolver } from "./query/fungible-chain-account-query-resolver"; +import { powHashBlockResolver } from "./fields/block/pow-hash-block-resolver"; export const resolvers: Resolvers = { DateTime: DateTimeResolver, @@ -79,6 +81,7 @@ export const resolvers: Resolvers = { events: eventsQueryResolver, fungibleAccount: fungibleAccountQueryResolver, fungibleAccountsByPublicKey: fungibleAccountsByPublicKeyQueryResolver, + fungibleChainAccount: fungibleChainAccountQueryResolver, fungibleChainAccounts: fungibleChainAccountsQueryResolver, fungibleChainAccountsByPublicKey: fungibleChainAccountsByPublicKeyQueryResolver, @@ -101,6 +104,7 @@ export const resolvers: Resolvers = { events: eventsBlockResolver, // add dataloader minerAccount: minerAccountBlockResolver, // add dataloader transactions: transactionsBlockResolver, // add dataloader + powHash: powHashBlockResolver, }, Event: { block: blockEventResolver, // data loader set. @@ -252,7 +256,13 @@ export const resolvers: Resolvers = { }, IGuard: { __resolveType: (obj: any) => { - return "KeysetGuard"; + if (obj.fun) { + return "UserGuard"; + } + if (obj.keys?.length) { + return "KeysetGuard"; + } + return "RawGuard"; }, }, }; diff --git a/indexer/src/kadena-server/resolvers/node-utils.ts b/indexer/src/kadena-server/resolvers/node-utils.ts index 632d960d..ddee736e 100644 --- a/indexer/src/kadena-server/resolvers/node-utils.ts +++ b/indexer/src/kadena-server/resolvers/node-utils.ts @@ -32,13 +32,14 @@ export const getNode = async (context: ResolverContext, id: string) => { if (type === "FungibleAccount") { const [_fungible, accountName] = JSON.parse(params); - const output = await context.balanceRepository.getAccountInfo(accountName); + const output = + await context.balanceRepository.getAccountInfo_NODE(accountName); return buildFungibleAccount(output); } if (type === "FungibleChainAccount") { const [chainId, fungibleName, accountName] = JSON.parse(params); - const output = await context.balanceRepository.getChainsAccountInfo( + const output = await context.balanceRepository.getChainsAccountInfo_NODE( accountName, fungibleName, [chainId], diff --git a/indexer/src/kadena-server/resolvers/output/build-fungible-chain-account-output.ts b/indexer/src/kadena-server/resolvers/output/build-fungible-chain-account-output.ts index e5db0fef..c767ad6a 100644 --- a/indexer/src/kadena-server/resolvers/output/build-fungible-chain-account-output.ts +++ b/indexer/src/kadena-server/resolvers/output/build-fungible-chain-account-output.ts @@ -1,5 +1,4 @@ import { - Block, FungibleChainAccountTransactionsConnection, FungibleChainAccountTransfersConnection, } from "../../config/graphql-types"; diff --git a/indexer/src/kadena-server/resolvers/query/fungible-chain-account-query-resolver.ts b/indexer/src/kadena-server/resolvers/query/fungible-chain-account-query-resolver.ts index 96801e8d..4bc4d65b 100644 --- a/indexer/src/kadena-server/resolvers/query/fungible-chain-account-query-resolver.ts +++ b/indexer/src/kadena-server/resolvers/query/fungible-chain-account-query-resolver.ts @@ -2,16 +2,16 @@ import { ResolverContext } from "../../config/apollo-server-config"; import { QueryResolvers } from "../../config/graphql-types"; import { buildFungibleChainAccount } from "../output/build-fungible-chain-account-output"; -export const fungibleChainAccountsQueryResolver: QueryResolvers["fungibleChainAccounts"] = +export const fungibleChainAccountQueryResolver: QueryResolvers["fungibleChainAccount"] = async (_parent, args, context) => { - const { accountName, chainIds, fungibleName } = args; - console.log("fungibleChainAccountsQueryResolver"); - const accounts = await context.balanceRepository.getChainsAccountInfo_NODE( + const { accountName, chainId, fungibleName } = args; + console.log("fungibleChainAccountQueryResolver"); + const [account] = await context.balanceRepository.getChainsAccountInfo_NODE( accountName, fungibleName, - chainIds?.map((c) => c.toString()), + [chainId.toString()], ); - const output = accounts.map((r) => buildFungibleChainAccount(r)); + const output = buildFungibleChainAccount(account); return output; }; diff --git a/indexer/src/kadena-server/resolvers/query/fungible-chain-accounts-query-resolver.ts b/indexer/src/kadena-server/resolvers/query/fungible-chain-accounts-query-resolver.ts new file mode 100644 index 00000000..96801e8d --- /dev/null +++ b/indexer/src/kadena-server/resolvers/query/fungible-chain-accounts-query-resolver.ts @@ -0,0 +1,17 @@ +import { ResolverContext } from "../../config/apollo-server-config"; +import { QueryResolvers } from "../../config/graphql-types"; +import { buildFungibleChainAccount } from "../output/build-fungible-chain-account-output"; + +export const fungibleChainAccountsQueryResolver: QueryResolvers["fungibleChainAccounts"] = + async (_parent, args, context) => { + const { accountName, chainIds, fungibleName } = args; + console.log("fungibleChainAccountsQueryResolver"); + const accounts = await context.balanceRepository.getChainsAccountInfo_NODE( + accountName, + fungibleName, + chainIds?.map((c) => c.toString()), + ); + + const output = accounts.map((r) => buildFungibleChainAccount(r)); + return output; + }; diff --git a/indexer/src/kadena-server/resolvers/query/transaction-query-resolver.ts b/indexer/src/kadena-server/resolvers/query/transaction-query-resolver.ts index f403afd7..c5b48aed 100644 --- a/indexer/src/kadena-server/resolvers/query/transaction-query-resolver.ts +++ b/indexer/src/kadena-server/resolvers/query/transaction-query-resolver.ts @@ -1,34 +1,27 @@ import { ResolverContext } from "../../config/apollo-server-config"; import { QueryResolvers } from "../../config/graphql-types"; -import { getBaselineAndOrphanedTransactions } from "../../domain/transaction/transaction-service"; import { buildTransactionOutput } from "../output/build-transaction-output"; export const transactionQueryResolver: QueryResolvers["transaction"] = async (_parent, args, context) => { console.log("transactionQueryResolver"); const { requestKey, blockHash, minimumDepth } = args; - const output = + const transactions = await context.transactionRepository.getTransactionsByRequestKey({ requestKey, blockHash, minimumDepth, }); - // TODO: implement orphaned transactions - // const lastBlockHeight = await context.blockRepository.getLastBlockHeight(); - // const { baselineTransaction, orphanedTransactions } = - // getBaselineAndOrphanedTransactions(output, lastBlockHeight); - // if (!baselineTransaction) return null; - // const orphanedTxs = orphanedTransactions.map((t) => - // buildTransactionOutput(t), - // ); + if (transactions.length === 0) return null; - const baseTx = output.length ? buildTransactionOutput(output[0]) : null; - - if (!baseTx) return null; + const [first, ...rest] = + await context.blockRepository.getTransactionsOrderedByBlockDepth( + transactions, + ); return { - ...baseTx, - orphanedTransactions: [], + ...buildTransactionOutput(first), + orphanedTransactions: rest.map((r) => buildTransactionOutput(r)), }; }; diff --git a/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts index b8a9f65d..9cc3b1ce 100644 --- a/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/events-subscription-resolver.ts @@ -1,54 +1,41 @@ -import { withFilter } from "graphql-subscriptions"; import { ResolverContext } from "../../config/apollo-server-config"; import { SubscriptionResolvers } from "../../config/graphql-types"; -import { eventsQueryResolver } from "../query/events-query-resolver"; -import zod from "zod"; +import { EventOutput } from "../../repository/application/event-repository"; +import { buildEventOutput } from "../output/build-event-output"; -import { EVENTS_EVENT } from "./consts"; +async function* iteratorFn( + context: ResolverContext, + qualifiedEventName: string, + chainId?: string | null, + minimumDepth?: number | null, +): AsyncGenerator { + let lastEventId = await context.eventRepository.getLastEventId(); + while (context.signal) { + const newEvents = await context.eventRepository.getLastEvents({ + qualifiedEventName, + lastEventId, + chainId, + minimumDepth, + }); -const eventsSubscriptionSchema = zod.object({ - chainId: zod.string(), - height: zod.number(), - qualifiedEventName: zod.string(), -}); + if (newEvents.length > 1) { + lastEventId = Number(newEvents[0].id); + yield newEvents.map((e) => buildEventOutput(e)); + } + + await new Promise((resolve) => setTimeout(resolve, 1000)); + } +} export const eventsSubscriptionResolver: SubscriptionResolvers["events"] = { - resolve: async (payload: any, _args: any, context: ResolverContext) => { - const res = await (eventsQueryResolver as any)( - {}, - { - blockHash: payload.hash, - chainId: payload.chainId, - qualifiedEventName: payload.qualifiedEventName, - }, + subscribe: (__root, args, context) => { + return iteratorFn( context, + args.qualifiedEventName, + args.chainId, + args.minimumDepth, ); - return res.edges.map((e: any) => e.node); - }, - subscribe: (_parent, args, context) => { - return { - [Symbol.asyncIterator]: withFilter( - () => context.pubSub.asyncIterator(EVENTS_EVENT), - (payload) => { - const res = eventsSubscriptionSchema.safeParse(payload); - if (!res.success) { - console.info("Invalid payload on eventsSubscription", payload); - return false; - } - const { chainId, height, qualifiedEventName } = res.data; - - if (args.chainId && chainId !== args.chainId) { - return false; - } - - if (args.minimumDepth && height < args.minimumDepth) { - return false; - } - - return qualifiedEventName === args.qualifiedEventName; - }, - ), - }; }, + resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts index 05245b4d..add79d64 100644 --- a/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/new-blocks-subscription-resolver.ts @@ -1,11 +1,7 @@ -import { withFilter } from "graphql-subscriptions"; import { ResolverContext } from "../../config/apollo-server-config"; import { SubscriptionResolvers } from "../../config/graphql-types"; - -import BlockModel from "../../../models/block"; // Sequelize model for blocks -import { Op } from "sequelize"; -import { blockValidator } from "../../repository/infra/schema-validator/block-schema-validator"; import { BlockOutput } from "../../repository/application/block-repository"; +import { buildBlockOutput } from "../output/build-block-output"; async function* iteratorFn( chainIds: string[], @@ -15,63 +11,26 @@ async function* iteratorFn( let lastBlockId: number | undefined; - while (true) { - const newBlocks = await getNewBlocks( - chainIds, - startingTimestamp, + while (context.signal) { + const newBlocks = await context.blockRepository.getLatestBlocks({ + creationTime: startingTimestamp, lastBlockId, - ); + chainIds, + }); if (newBlocks.length > 0) { - lastBlockId = newBlocks[0].id; // Update the last block ID - yield newBlocks.map((block) => blockValidator.mapFromSequelize(block)); + lastBlockId = Number(newBlocks[0].id); + yield newBlocks.map((b) => buildBlockOutput(b)); } await new Promise((resolve) => setTimeout(resolve, 1000)); } } -async function getNewBlocks( - chainIds: string[], - date: number, - lastBlockId?: number, -) { - return BlockModel.findAll({ - where: { - ...(lastBlockId && { id: { [Op.gt]: lastBlockId } }), - creationTime: { [Op.gt]: date }, - ...(chainIds.length && { chainId: { [Op.in]: chainIds } }), - }, - limit: 100, - order: [["id", "DESC"]], - }); -} - export const newBlocksSubscriptionResolver: SubscriptionResolvers["newBlocks"] = { - subscribe: async (_parent, args, context) => { - const chainIds = args.chainIds ?? []; - - const iterator = withFilter( - () => iteratorFn(chainIds, context), - (payload, variables) => { - let subscribedChainIds; - if (variables) { - subscribedChainIds = variables.chainIds; - } else { - subscribedChainIds = []; - } - console.log("subscribedChainIds", subscribedChainIds); - return ( - !subscribedChainIds.length || - subscribedChainIds.includes(payload.chainId) - ); - }, - )(); - - return { - [Symbol.asyncIterator]: () => iterator, - }; + subscribe: (_root, args, context) => { + return iteratorFn(args.chainIds ?? [], context); }, resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts b/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts index 1e587ded..474dc5df 100644 --- a/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts +++ b/indexer/src/kadena-server/resolvers/subscription/transaction-subscription-resolver.ts @@ -1,45 +1,41 @@ -import { withFilter } from "graphql-subscriptions"; import { ResolverContext } from "../../config/apollo-server-config"; -import { - SubscriptionResolvers, - SubscriptionTransactionArgs, -} from "../../config/graphql-types"; +import { SubscriptionResolvers } from "../../config/graphql-types"; +import { TransactionOutput } from "../../repository/application/transaction-repository"; +import { buildTransactionOutput } from "../output/build-transaction-output"; -import { transactionQueryResolver } from "../query/transaction-query-resolver"; -import { TRANSACTION_EVENT } from "./consts"; -import zod from "zod"; +async function* iteratorFn( + requestKey: string, + context: ResolverContext, + chainId?: string | null, +): AsyncGenerator { + while (context.signal) { + const { edges } = await context.transactionRepository.getTransactions({ + requestKey, + chainId, + }); + const transactions = edges.map((e) => e.node); -const newTransactionSubscriptionSchema = zod.object({ - requestKey: zod.string(), - chainId: zod.string(), -}); + if (transactions.length > 0) { + const [first, ...rest] = + await context.blockRepository.getTransactionsOrderedByBlockDepth( + transactions, + ); + + const result = { + ...buildTransactionOutput(first), + orphanedTransactions: rest.map((r) => buildTransactionOutput(r)), + }; + + yield result; + return; + } + } +} export const transactionSubscriptionResolver: SubscriptionResolvers["transaction"] = { - resolve: async (payload: any, _args: any, context: ResolverContext) => { - return (transactionQueryResolver as any)( - {}, - { requestKey: payload.requestKey, chainId: payload.chainId }, - context, - ); - }, - subscribe: (_parent, args, context) => { - return { - [Symbol.asyncIterator]: withFilter( - () => context.pubSub.asyncIterator([TRANSACTION_EVENT]), - (payload) => { - const res = newTransactionSubscriptionSchema.safeParse(payload); - if (!res.success) { - console.info( - "Invalid payload on newTransactionSubscriptionSchema", - payload, - ); - return false; - } - const { requestKey, chainId } = res.data; - return requestKey === args.requestKey && chainId === args.chainId; - }, - ), - }; + subscribe: (_root, args, context) => { + return iteratorFn(args.requestKey, context, args.chainId); }, + resolve: (payload: any) => payload, }; diff --git a/indexer/src/kadena-server/server.ts b/indexer/src/kadena-server/server.ts index 94f09f05..c1e2e8cc 100644 --- a/indexer/src/kadena-server/server.ts +++ b/indexer/src/kadena-server/server.ts @@ -15,7 +15,7 @@ import { import { WebSocketServer } from "ws"; import { useServer } from "graphql-ws/lib/use/ws"; import { makeExecutableSchema } from "@graphql-tools/schema"; -import { ArgumentNode, ASTNode, GraphQLError, Kind, ValueNode } from "graphql"; +import { ArgumentNode, ASTNode, GraphQLError, Kind } from "graphql"; import { EVENTS_EVENT, NEW_BLOCKS_EVENT, @@ -151,7 +151,18 @@ export async function useKadenaGraphqlServer() { const serverCleanup = useServer( { schema, - context, + context: async (ctx) => { + const abortController = new AbortController(); + + ctx.extra.socket.addEventListener("close", () => { + abortController.abort(); // Only aborts this specific subscription + }); + + return { + ...context, + signal: abortController.signal, // Pass signal per subscription + }; + }, }, wsServer, ); diff --git a/indexer/src/services/sync/coinbase.ts b/indexer/src/services/sync/coinbase.ts new file mode 100644 index 00000000..da761157 --- /dev/null +++ b/indexer/src/services/sync/coinbase.ts @@ -0,0 +1,156 @@ +import { closeDatabase, rootPgPool, sequelize } from "../../config/database"; +import TransactionModel from "../../models/transaction"; +import Transfer from "../../models/transfer"; +import { Transaction } from "sequelize"; +import Event, { EventAttributes } from "../../models/event"; +import { getCoinTransfers } from "./transfers"; + +export async function startBackfillCoinbaseTransactions() { + console.log("Starting coinbase backfill ..."); + + const limit = 1000; // Number of rows to process in one batch + let offset = 0; + + while (true) { + console.log(`Fetching rows from offset: ${offset}, limit: ${limit}`); + const res = await rootPgPool.query( + `SELECT b.id, b.coinbase, b."chainId", b."creationTime" FROM "Blocks" b ORDER BY b.id LIMIT $1 OFFSET $2`, + [limit, offset], + ); + + const rows = res.rows; + if (rows.length === 0) { + console.log("No more rows to process."); + break; + } + + const tx = await sequelize.transaction(); + try { + await addCoinbaseTransactions(rows, tx); + await tx.commit(); + console.log(`Batch at offset ${offset} processed successfully.`); + offset += limit; + } catch (batchError) { + console.error(`Error processing batch at offset ${offset}:`, batchError); + try { + await tx.rollback(); + console.log(`Transaction for batch at offset ${offset} rolled back.`); + } catch (rollbackError) { + console.error("Error during rollback:", rollbackError); + } + break; + } + } + + await closeDatabase(); + process.exit(0); +} + +async function addCoinbaseTransactions(rows: Array, tx: Transaction) { + const fetchPromises = rows.map(async (row, index) => { + const output = await processCoinbaseTransaction(row.coinbase, { + id: row.id, + chainId: row.chainId, + creationTime: row.creationTime, + }); + return output; + }); + + const allData = (await Promise.all(fetchPromises)).filter( + (f) => f !== undefined, + ); + + const transactionsAdded = await TransactionModel.bulkCreate( + allData.map((o) => o?.transactionAttributes ?? []), + { + transaction: tx, + returning: ["id"], + }, + ); + + const transfersToAdd = allData + .map((d, index) => { + const transfersWithTransactionId = (d?.transfersCoinAttributes ?? []).map( + (t) => ({ + ...t, + transactionId: transactionsAdded[index].id, + }), + ); + return transfersWithTransactionId; + }) + .flat(); + + const eventsToAdd = allData + .map((d, index) => { + const eventsWithTransactionId = (d?.eventsAttributes ?? []).map((t) => ({ + ...t, + transactionId: transactionsAdded[index].id, + })); + return eventsWithTransactionId; + }) + .flat(); + + await Transfer.bulkCreate(transfersToAdd, { + transaction: tx, + }); + + await Event.bulkCreate(eventsToAdd, { + transaction: tx, + }); +} + +export async function processCoinbaseTransaction( + coinbase: any, + block: { id: number; chainId: number; creationTime: bigint }, +) { + if (!coinbase) return; + + const eventsData = coinbase.events || []; + const transactionAttributes = { + blockId: block.id, + code: {}, + data: {}, + chainId: block.chainId, + creationtime: block.creationTime, + gaslimit: "0", + gasprice: "0", + hash: coinbase.reqKey, + nonce: "", + pactid: null, + continuation: {}, + gas: "0", + result: coinbase.result, + logs: coinbase.logs, + num_events: eventsData ? eventsData.length : 0, + requestkey: coinbase.reqKey, + rollback: null, + sender: "coinbase", + sigs: [], + step: null, + proof: null, + ttl: "0", + txid: coinbase.txId.toString(), + } as any; + + const transfersCoinAttributes = await getCoinTransfers( + eventsData, + transactionAttributes, + ); + + const eventsAttributes = eventsData.map((eventData: any) => { + return { + chainId: transactionAttributes.chainId, + module: eventData.module.namespace + ? `${eventData.module.namespace}.${eventData.module.name}` + : eventData.module.name, + name: eventData.name, + params: eventData.params, + qualname: eventData.module.namespace + ? `${eventData.module.namespace}.${eventData.module.name}` + : eventData.module.name, + requestkey: coinbase.reqKey, + } as EventAttributes; + }) as EventAttributes[]; + + return { transactionAttributes, eventsAttributes, transfersCoinAttributes }; +} diff --git a/indexer/src/services/sync/payload.ts b/indexer/src/services/sync/payload.ts index 8daca587..c6f16a53 100644 --- a/indexer/src/services/sync/payload.ts +++ b/indexer/src/services/sync/payload.ts @@ -7,9 +7,10 @@ import Transfer, { TransferAttributes } from "../../models/transfer"; import { getNftTransfers, getCoinTransfers } from "./transfers"; import { QueryTypes, Transaction } from "sequelize"; import Signer from "../../models/signer"; -import Guard, { GuardAttributes } from "../../models/guard"; +import Guard from "../../models/guard"; import { handleSingleQuery } from "../../kadena-server/utils/raw-query"; import { sequelize } from "../../config/database"; +import { processCoinbaseTransaction } from "./coinbase"; const TRANSACTION_INDEX = 0; const RECEIPT_INDEX = 1; @@ -32,6 +33,12 @@ export async function processPayloadKey( processTransaction(transactionArray, block, tx), ); + await processCoinbaseTransaction(payloadData.coinbase, { + id: block.id, + chainId: block.chainId, + creationTime: block.creationTime, + }); + return (await Promise.all(transactionPromises)).flat(); } @@ -80,6 +87,7 @@ export async function processTransaction( sender: cmdData?.meta?.sender || null, sigs: sigsData, step: cmdData?.payload?.cont?.step || 0, + proof: cmdData?.payload?.cont?.proof || null, ttl: cmdData.meta.ttl, txid: receiptInfo.txId ? receiptInfo.txId.toString() : null, } as TransactionAttributes; @@ -102,14 +110,12 @@ export async function processTransaction( const transfersCoinAttributes = await getCoinTransfers( eventsData, transactionAttributes, - receiptInfo, ); const transfersNftAttributes = await getNftTransfers( transactionAttributes.chainId, eventsData, transactionAttributes, - receiptInfo, ); const transfersAttributes = [transfersCoinAttributes, transfersNftAttributes] diff --git a/indexer/src/services/sync/streaming.ts b/indexer/src/services/sync/streaming.ts index 47d1c8dc..2fce5e5a 100644 --- a/indexer/src/services/sync/streaming.ts +++ b/indexer/src/services/sync/streaming.ts @@ -14,7 +14,6 @@ export async function startStreaming() { console.log("Starting streaming..."); const blocksAlreadyReceived = new Set(); - const blocksQueue: Array = []; const eventSource = new EventSource( `${SYNC_BASE_URL}/${SYNC_NETWORK}/block/updates`, @@ -40,7 +39,6 @@ export async function startStreaming() { }); return; } - blocksQueue.push(blockData); } catch (error) { console.log(error); } diff --git a/indexer/src/services/sync/transfers.ts b/indexer/src/services/sync/transfers.ts index b9309df5..a11f2c30 100644 --- a/indexer/src/services/sync/transfers.ts +++ b/indexer/src/services/sync/transfers.ts @@ -17,7 +17,6 @@ export function getNftTransfers( chainId: number, eventsData: any, transactionAttributes: TransactionAttributes, - receiptInfo: any, ) { const TRANSFER_NFT_SIGNATURE = "TRANSFER"; const TRANSFER_NFT_PARAMS_LENGTH = 4; @@ -51,7 +50,7 @@ export function getNftTransfers( from_acct: from_acct, modulehash: eventData.moduleHash, modulename: modulename, - requestkey: receiptInfo.reqKey, + requestkey: transactionAttributes.requestkey, to_acct: to_acct, hasTokenId: true, tokenId: tokenId, @@ -71,13 +70,12 @@ const requests: Record = {}; * * @param {Array} eventsData - The array of event data from a transaction payload. * @param {TransactionAttributes} transactionAttributes - Transaction attributes associated with the events. - * @param {any} receiptInfo - Receipt information associated with the events. + * @param {any} requestKey - Associated to the T. * @returns {Promise} A Promise that resolves to an array of transfer attributes specifically for coin transfers. */ export function getCoinTransfers( eventsData: any, transactionAttributes: TransactionAttributes, - receiptInfo: any, ) { const TRANSFER_COIN_SIGNATURE = "TRANSFER"; const TRANSFER_COIN_PARAMS_LENGTH = 3; @@ -115,7 +113,7 @@ export function getCoinTransfers( "fungible", null, null, - Number(precisionData.result), + Number(JSON.parse(precisionData.result).int), ); } console.log(precisionData); @@ -134,7 +132,7 @@ export function getCoinTransfers( modulename: eventData.module.namespace ? `${eventData.module.namespace}.${eventData.module.name}` : eventData.module.name, - requestkey: receiptInfo.reqKey, + requestkey: transactionAttributes.requestkey, to_acct: to_acct, hasTokenId: false, tokenId: undefined, diff --git a/indexer/src/utils/chainweb-node.ts b/indexer/src/utils/chainweb-node.ts index 38a39a0e..bbf477f0 100644 --- a/indexer/src/utils/chainweb-node.ts +++ b/indexer/src/utils/chainweb-node.ts @@ -13,9 +13,24 @@ export const formatBalance_NODE = (queryResult: PactQueryResponse) => { export const formatGuard_NODE = (queryResult: PactQueryResponse) => { const resultParsed = JSON.parse(queryResult.result ?? "{}"); - return { - keys: resultParsed.guard.keys, - predicate: resultParsed.guard.pred, - raw: JSON.stringify(resultParsed.guard), - }; + + if (resultParsed.guard?.fun) { + return { + args: resultParsed.guard.args.map((arg: any) => JSON.stringify(arg)), + fun: resultParsed.guard.fun, + raw: JSON.stringify(resultParsed.guard), + keys: [], + predicate: "", + }; + } + + if (resultParsed.guard?.pred) { + return { + keys: resultParsed.guard.keys, + predicate: resultParsed.guard.pred, + raw: JSON.stringify(resultParsed.guard), + }; + } + + return { raw: JSON.stringify(resultParsed.guard), keys: [], predicate: "" }; };