diff --git a/indexer/src/index.ts b/indexer/src/index.ts index 676e0411..09fd9a3a 100644 --- a/indexer/src/index.ts +++ b/indexer/src/index.ts @@ -47,6 +47,7 @@ const options = program.opts(); */ async function main() { try { + console.info('Starting v1.0.2.2 with pair creation'); setupAssociations(); PriceUpdaterService.getInstance(); diff --git a/indexer/src/services/pair-service.ts b/indexer/src/services/pair-service.ts index 0249f902..4f47a442 100644 --- a/indexer/src/services/pair-service.ts +++ b/indexer/src/services/pair-service.ts @@ -127,15 +127,15 @@ export class PairService { batches.push(pairs.slice(i, i + BATCH_SIZE)); } - console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`); + // console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`); // Process batches sequentially for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex]; const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2); - console.log( - `Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`, - ); + // console.log( + // `Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`, + // ); const tx = transaction || (await sequelize.transaction()); try { @@ -173,7 +173,7 @@ export class PairService { console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error); } } - console.log('Finished processing all pair creation batches'); + // console.log('Finished processing all pair creation batches'); } /** @@ -227,15 +227,15 @@ export class PairService { batches.push(updateEvents.slice(i, i + BATCH_SIZE)); } - console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`); + // console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`); // Process batches sequentially for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex]; const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2); - console.log( - `Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`, - ); + // console.log( + // `Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`, + // ); const tx = transaction || (await sequelize.transaction()); try { @@ -408,7 +408,7 @@ export class PairService { console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error); } } - console.log('Finished processing all update batches'); + // console.log('Finished processing all update batches'); } /** @@ -629,15 +629,15 @@ export class PairService { batches.push(swapEvents.slice(i, i + BATCH_SIZE)); } - console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`); + // console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`); // Process batches sequentially for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex]; const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2); - console.log( - `Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`, - ); + // console.log( + // `Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`, + // ); const tx = transaction || (await sequelize.transaction()); try { @@ -726,7 +726,7 @@ export class PairService { console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error); } } - console.log('Finished processing all swap batches'); + // console.log('Finished processing all swap batches'); } /** @@ -758,17 +758,17 @@ export class PairService { batches.push(liquidityEvents.slice(i, i + BATCH_SIZE)); } - console.log( - `Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`, - ); + // console.log( + // `Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`, + // ); // Process batches sequentially for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex]; const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2); - console.log( - `Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`, - ); + // console.log( + // `Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`, + // ); const tx = transaction || (await sequelize.transaction()); try { diff --git a/indexer/src/services/pair.ts b/indexer/src/services/pair.ts index c1eee452..ddb04a33 100644 --- a/indexer/src/services/pair.ts +++ b/indexer/src/services/pair.ts @@ -5,10 +5,11 @@ import { Op, WhereOptions, Transaction as SequelizeTransaction } from 'sequelize import Transaction from '../models/transaction'; import { DEFAULT_PROTOCOL } from '../kadena-server/config/apollo-server-config'; import Block from '@/models/block'; +import { sequelize } from '@/config/database'; -const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`]; -const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY']; -const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT']; +export const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`]; +export const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY']; +export const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT']; const LAST_BLOCK_ID = process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID ? Number(process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID) @@ -164,7 +165,6 @@ export async function backfillPairEvents( include: [ { model: Block, - as: 'block', attributes: ['height'], where: { canonical: true, @@ -182,14 +182,22 @@ export async function backfillPairEvents( continue; } - const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2); - console.log( - `Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`, - ); - await processPairCreationEvents( - events.map(event => event.get({ plain: true })), - null, - ); + // const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2); + // console.log( + // `Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`, + // ); + + const tx = await sequelize.transaction(); + try { + await processPairCreationEvents( + events.map(event => event.get({ plain: true })), + tx, + ); + await tx.commit(); + } catch (error) { + await tx.rollback(); + console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error); + } processedCount += events.length; const endTime = Date.now(); @@ -202,4 +210,5 @@ export async function backfillPairEvents( } console.log(`Backfill completed. Processed ${processedCount} events.`); + process.exit(0); } diff --git a/indexer/src/services/payload.ts b/indexer/src/services/payload.ts index 2ae6a5f3..b94371d5 100644 --- a/indexer/src/services/payload.ts +++ b/indexer/src/services/payload.ts @@ -230,27 +230,12 @@ export async function processTransaction( ); const events = await Promise.all(eventsAttributes); - // Note: Should not summit debug to the 'main' branch - // const swapEvents = events.filter(event => event.name === 'SWAP'); - // console.log('swapEvents', JSON.stringify(swapEvents, null, 2)); - // const addLiquidityEvents = events.filter(event => event.name === 'ADD_LIQUIDITY'); - // console.log('addLiquidityEvents', JSON.stringify(addLiquidityEvents, null, 2)); - // const mintEvents = events.filter(event => event.name === 'MINT_EVENT'); - // console.log('mintEvents', JSON.stringify(mintEvents, null, 2)); - // console.log('------------------------------------- end -------------------------------'); const eventsWithTransactionId = events.map(event => ({ ...event, transactionId, })); await Event.bulkCreate(eventsWithTransactionId, { transaction: tx }); - // Process pair creation events - try { - await processPairCreationEvents(eventsWithTransactionId, tx); - } catch (error) { - console.error('Error processing pair creation events:', error); - } - const signers = (cmdData.signers ?? []).map((signer: any, index: number) => ({ address: signer.address, orderIndex: index, diff --git a/indexer/src/services/start-pair-creation.ts b/indexer/src/services/start-pair-creation.ts new file mode 100644 index 00000000..14ae0f0f --- /dev/null +++ b/indexer/src/services/start-pair-creation.ts @@ -0,0 +1,101 @@ +import { rootPgPool, sequelize } from '@/config/database'; +import { EventAttributes } from '@/models/event'; +import { + EVENT_TYPES, + EXCHANGE_TOKEN_EVENTS, + MODULE_NAMES, + processPairCreationEvents, +} from '@/services/pair'; +import { isNullOrUndefined } from '@/utils/helpers'; +import { Transaction as SeqTransaction } from 'sequelize'; + +let lastHeightProcessed: number | null = null; +const BLOCK_DEPTH = 6; + +export async function startPairCreation() { + let pairCreationTx: SeqTransaction; + let events: EventAttributes[] = []; + let currentHeight: number; + + try { + const result: any = await rootPgPool.query('SELECT max(height) FROM "Blocks"'); + const maxHeight = result.rows?.[0].max; + + if (isNullOrUndefined(maxHeight)) { + console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get max height from database'); + return; + } + + currentHeight = maxHeight - BLOCK_DEPTH; + if (lastHeightProcessed === null) { + lastHeightProcessed = currentHeight - 10; + } + + if (currentHeight - lastHeightProcessed <= 0) { + console.info('[INFO][SYNC][STREAMING] No new blocks to process', { + lastHeightProcessed, + currentHeight, + }); + return; + } + + const EVENT_NAMES = [...EVENT_TYPES, ...EXCHANGE_TOKEN_EVENTS]; + const queryParams = [lastHeightProcessed, currentHeight, EVENT_NAMES, MODULE_NAMES]; + const query = ` + SELECT + e.id, + e."transactionId", + e."chainId", + e.module, + e.name, + e.params, + e.qualname, + e.requestkey, + e."orderIndex" + FROM "Events" e + JOIN "Transactions" t ON t.id = e."transactionId" + JOIN "Blocks" b ON b.id = t."blockId" + WHERE b.height > $1 AND b.height <= $2 + AND e."name" = ANY($3::text[]) + AND e."module" = ANY($4::text[]) + `; + + const eventsResult = await rootPgPool.query(query, queryParams); + + events = eventsResult.rows.map(r => { + return { + name: r.name, + module: r.module, + params: r.params, + transactionId: r.transactionId, + blockId: r.blockId, + height: r.height, + createdAt: r.createdAt, + chainId: r.chainId, + qualname: r.qualname, + requestkey: r.requestkey, + orderIndex: r.orderIndex, + id: r.id, + }; + }) as EventAttributes[]; + } catch (error) { + console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get events:', error); + return; + } + + try { + pairCreationTx = await sequelize.transaction(); + } catch (error) { + console.error('[ERROR][DATA][DATA_CORRUPT] Failed to create pair creation transaction:', error); + return; + } + + try { + await processPairCreationEvents(events, pairCreationTx); + await pairCreationTx.commit(); + lastHeightProcessed = currentHeight; + } catch (error) { + await pairCreationTx.rollback(); + console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error); + } +} diff --git a/indexer/src/services/streaming.ts b/indexer/src/services/streaming.ts index b4940079..259f6ce1 100644 --- a/indexer/src/services/streaming.ts +++ b/indexer/src/services/streaming.ts @@ -25,6 +25,7 @@ import { PriceUpdaterService } from './price/price-updater.service'; import { defineCanonicalBaseline } from '@/services/define-canonical'; import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing'; import { EventAttributes } from '@/models/event'; +import { startPairCreation } from '@/services/start-pair-creation'; const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL'); const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK'); @@ -142,6 +143,9 @@ export async function startStreaming() { processBlocks(); backfillGuards(); + + // Schedule a periodic check of pair creation events every 2 minutes + setInterval(startPairCreation, 1000 * 60 * 2); } /**