Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const options = program.opts();
*/
async function main() {
try {
console.info('Starting v1.0.2.2 with pair creation');
setupAssociations();
PriceUpdaterService.getInstance();

Expand Down
42 changes: 21 additions & 21 deletions indexer/src/services/pair-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ export class PairService {
batches.push(pairs.slice(i, i + BATCH_SIZE));
}

console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`);
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`);

// Process batches sequentially
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex];
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
console.log(
`Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`,
);
// console.log(
// `Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`,
// );

const tx = transaction || (await sequelize.transaction());
try {
Expand Down Expand Up @@ -173,7 +173,7 @@ export class PairService {
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
}
}
console.log('Finished processing all pair creation batches');
// console.log('Finished processing all pair creation batches');
}

/**
Expand Down Expand Up @@ -227,15 +227,15 @@ export class PairService {
batches.push(updateEvents.slice(i, i + BATCH_SIZE));
}

console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`);
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`);

// Process batches sequentially
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex];
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
console.log(
`Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`,
);
// console.log(
// `Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`,
// );

const tx = transaction || (await sequelize.transaction());
try {
Expand Down Expand Up @@ -408,7 +408,7 @@ export class PairService {
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
}
}
console.log('Finished processing all update batches');
// console.log('Finished processing all update batches');
}

/**
Expand Down Expand Up @@ -629,15 +629,15 @@ export class PairService {
batches.push(swapEvents.slice(i, i + BATCH_SIZE));
}

console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`);
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`);

// Process batches sequentially
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex];
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
console.log(
`Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`,
);
// console.log(
// `Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`,
// );

const tx = transaction || (await sequelize.transaction());
try {
Expand Down Expand Up @@ -726,7 +726,7 @@ export class PairService {
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
}
}
console.log('Finished processing all swap batches');
// console.log('Finished processing all swap batches');
}

/**
Expand Down Expand Up @@ -758,17 +758,17 @@ export class PairService {
batches.push(liquidityEvents.slice(i, i + BATCH_SIZE));
}

console.log(
`Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`,
);
// console.log(
// `Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`,
// );

// Process batches sequentially
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex];
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
console.log(
`Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`,
);
// console.log(
// `Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`,
// );

const tx = transaction || (await sequelize.transaction());
try {
Expand Down
33 changes: 21 additions & 12 deletions indexer/src/services/pair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import { Op, WhereOptions, Transaction as SequelizeTransaction } from 'sequelize
import Transaction from '../models/transaction';
import { DEFAULT_PROTOCOL } from '../kadena-server/config/apollo-server-config';
import Block from '@/models/block';
import { sequelize } from '@/config/database';

const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`];
const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY'];
const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT'];
export const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`];
export const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY'];
export const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT'];

const LAST_BLOCK_ID = process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID
? Number(process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID)
Expand Down Expand Up @@ -164,7 +165,6 @@ export async function backfillPairEvents(
include: [
{
model: Block,
as: 'block',
attributes: ['height'],
where: {
canonical: true,
Expand All @@ -182,14 +182,22 @@ export async function backfillPairEvents(
continue;
}

const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2);
console.log(
`Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`,
);
await processPairCreationEvents(
events.map(event => event.get({ plain: true })),
null,
);
// const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2);
// console.log(
// `Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`,
// );

const tx = await sequelize.transaction();
try {
await processPairCreationEvents(
events.map(event => event.get({ plain: true })),
tx,
);
await tx.commit();
} catch (error) {
await tx.rollback();
console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error);
}
processedCount += events.length;

const endTime = Date.now();
Expand All @@ -202,4 +210,5 @@ export async function backfillPairEvents(
}

console.log(`Backfill completed. Processed ${processedCount} events.`);
process.exit(0);
}
15 changes: 0 additions & 15 deletions indexer/src/services/payload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,27 +230,12 @@ export async function processTransaction(
);

const events = await Promise.all(eventsAttributes);
// Note: Should not summit debug to the 'main' branch
// const swapEvents = events.filter(event => event.name === 'SWAP');
// console.log('swapEvents', JSON.stringify(swapEvents, null, 2));
// const addLiquidityEvents = events.filter(event => event.name === 'ADD_LIQUIDITY');
// console.log('addLiquidityEvents', JSON.stringify(addLiquidityEvents, null, 2));
// const mintEvents = events.filter(event => event.name === 'MINT_EVENT');
// console.log('mintEvents', JSON.stringify(mintEvents, null, 2));
// console.log('------------------------------------- end -------------------------------');
const eventsWithTransactionId = events.map(event => ({
...event,
transactionId,
}));
await Event.bulkCreate(eventsWithTransactionId, { transaction: tx });

// Process pair creation events
try {
await processPairCreationEvents(eventsWithTransactionId, tx);
} catch (error) {
console.error('Error processing pair creation events:', error);
}

const signers = (cmdData.signers ?? []).map((signer: any, index: number) => ({
address: signer.address,
orderIndex: index,
Expand Down
101 changes: 101 additions & 0 deletions indexer/src/services/start-pair-creation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { rootPgPool, sequelize } from '@/config/database';
import { EventAttributes } from '@/models/event';
import {
EVENT_TYPES,
EXCHANGE_TOKEN_EVENTS,
MODULE_NAMES,
processPairCreationEvents,
} from '@/services/pair';
import { isNullOrUndefined } from '@/utils/helpers';
import { Transaction as SeqTransaction } from 'sequelize';

let lastHeightProcessed: number | null = null;
const BLOCK_DEPTH = 6;

export async function startPairCreation() {
let pairCreationTx: SeqTransaction;
let events: EventAttributes[] = [];
let currentHeight: number;

try {
const result: any = await rootPgPool.query('SELECT max(height) FROM "Blocks"');
const maxHeight = result.rows?.[0].max;

if (isNullOrUndefined(maxHeight)) {
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get max height from database');
return;
}

currentHeight = maxHeight - BLOCK_DEPTH;
if (lastHeightProcessed === null) {
lastHeightProcessed = currentHeight - 10;
}

if (currentHeight - lastHeightProcessed <= 0) {
console.info('[INFO][SYNC][STREAMING] No new blocks to process', {
lastHeightProcessed,
currentHeight,
});
return;
}

const EVENT_NAMES = [...EVENT_TYPES, ...EXCHANGE_TOKEN_EVENTS];
const queryParams = [lastHeightProcessed, currentHeight, EVENT_NAMES, MODULE_NAMES];
const query = `
SELECT
e.id,
e."transactionId",
e."chainId",
e.module,
e.name,
e.params,
e.qualname,
e.requestkey,
e."orderIndex"
FROM "Events" e
JOIN "Transactions" t ON t.id = e."transactionId"
JOIN "Blocks" b ON b.id = t."blockId"
WHERE b.height > $1 AND b.height <= $2
AND e."name" = ANY($3::text[])
AND e."module" = ANY($4::text[])
`;

const eventsResult = await rootPgPool.query(query, queryParams);

events = eventsResult.rows.map(r => {
return {
name: r.name,
module: r.module,
params: r.params,
transactionId: r.transactionId,
blockId: r.blockId,
height: r.height,
createdAt: r.createdAt,
chainId: r.chainId,
qualname: r.qualname,
requestkey: r.requestkey,
orderIndex: r.orderIndex,
id: r.id,
};
}) as EventAttributes[];
} catch (error) {
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get events:', error);
return;
}

try {
pairCreationTx = await sequelize.transaction();
} catch (error) {
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to create pair creation transaction:', error);
return;
}

try {
await processPairCreationEvents(events, pairCreationTx);
await pairCreationTx.commit();
lastHeightProcessed = currentHeight;
} catch (error) {
await pairCreationTx.rollback();
console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error);
}
}
4 changes: 4 additions & 0 deletions indexer/src/services/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { PriceUpdaterService } from './price/price-updater.service';
import { defineCanonicalBaseline } from '@/services/define-canonical';
import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing';
import { EventAttributes } from '@/models/event';
import { startPairCreation } from '@/services/start-pair-creation';

const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
Expand Down Expand Up @@ -142,6 +143,9 @@ export async function startStreaming() {

processBlocks();
backfillGuards();

// Schedule a periodic check of pair creation events every 2 minutes
setInterval(startPairCreation, 1000 * 60 * 2);
}

/**
Expand Down