Skip to content

Commit d2f6af1

Browse files
committed
refactor: separated pairCreation process from indexing
1 parent 84e0efe commit d2f6af1

File tree

6 files changed

+148
-48
lines changed

6 files changed

+148
-48
lines changed

indexer/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const options = program.opts();
4747
*/
4848
async function main() {
4949
try {
50+
console.info('Starting v1.0.2.2 with pair creation');
5051
setupAssociations();
5152
PriceUpdaterService.getInstance();
5253

indexer/src/services/pair-service.ts

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ export class PairService {
127127
batches.push(pairs.slice(i, i + BATCH_SIZE));
128128
}
129129

130-
console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`);
130+
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} pairs each`);
131131

132132
// Process batches sequentially
133133
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
134134
const batch = batches[batchIndex];
135135
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
136-
console.log(
137-
`Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`,
138-
);
136+
// console.log(
137+
// `Progress: ${progressPercentage}% (Create Pairs Batch ${batchIndex + 1}/${batches.length})`,
138+
// );
139139

140140
const tx = transaction || (await sequelize.transaction());
141141
try {
@@ -173,7 +173,7 @@ export class PairService {
173173
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
174174
}
175175
}
176-
console.log('Finished processing all pair creation batches');
176+
// console.log('Finished processing all pair creation batches');
177177
}
178178

179179
/**
@@ -227,15 +227,15 @@ export class PairService {
227227
batches.push(updateEvents.slice(i, i + BATCH_SIZE));
228228
}
229229

230-
console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`);
230+
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} events each`);
231231

232232
// Process batches sequentially
233233
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
234234
const batch = batches[batchIndex];
235235
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
236-
console.log(
237-
`Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`,
238-
);
236+
// console.log(
237+
// `Progress: ${progressPercentage}% (Update Pairs Batch ${batchIndex + 1}/${batches.length})`,
238+
// );
239239

240240
const tx = transaction || (await sequelize.transaction());
241241
try {
@@ -408,7 +408,7 @@ export class PairService {
408408
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
409409
}
410410
}
411-
console.log('Finished processing all update batches');
411+
// console.log('Finished processing all update batches');
412412
}
413413

414414
/**
@@ -629,15 +629,15 @@ export class PairService {
629629
batches.push(swapEvents.slice(i, i + BATCH_SIZE));
630630
}
631631

632-
console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`);
632+
// console.log(`Starting to process ${batches.length} batches of ${BATCH_SIZE} swap events each`);
633633

634634
// Process batches sequentially
635635
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
636636
const batch = batches[batchIndex];
637637
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
638-
console.log(
639-
`Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`,
640-
);
638+
// console.log(
639+
// `Progress: ${progressPercentage}% (Process Swaps Batch ${batchIndex + 1}/${batches.length})`,
640+
// );
641641

642642
const tx = transaction || (await sequelize.transaction());
643643
try {
@@ -726,7 +726,7 @@ export class PairService {
726726
console.error(`Error processing batch ${batchIndex + 1}/${batches.length}:`, error);
727727
}
728728
}
729-
console.log('Finished processing all swap batches');
729+
// console.log('Finished processing all swap batches');
730730
}
731731

732732
/**
@@ -758,17 +758,17 @@ export class PairService {
758758
batches.push(liquidityEvents.slice(i, i + BATCH_SIZE));
759759
}
760760

761-
console.log(
762-
`Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`,
763-
);
761+
// console.log(
762+
// `Starting to process ${batches.length} batches of ${BATCH_SIZE} liquidity events each`,
763+
// );
764764

765765
// Process batches sequentially
766766
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
767767
const batch = batches[batchIndex];
768768
const progressPercentage = (((batchIndex + 1) / batches.length) * 100).toFixed(2);
769-
console.log(
770-
`Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`,
771-
);
769+
// console.log(
770+
// `Progress: ${progressPercentage}% (Process Liquidity Events Batch ${batchIndex + 1}/${batches.length})`,
771+
// );
772772

773773
const tx = transaction || (await sequelize.transaction());
774774
try {

indexer/src/services/pair.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import { Op, WhereOptions, Transaction as SequelizeTransaction } from 'sequelize
55
import Transaction from '../models/transaction';
66
import { DEFAULT_PROTOCOL } from '../kadena-server/config/apollo-server-config';
77
import Block from '@/models/block';
8+
import { sequelize } from '@/config/database';
89

9-
const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`];
10-
const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY'];
11-
const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT'];
10+
export const MODULE_NAMES = [`${DEFAULT_PROTOCOL}`, `${DEFAULT_PROTOCOL}-tokens`];
11+
export const EVENT_TYPES = ['CREATE_PAIR', 'UPDATE', 'SWAP', 'ADD_LIQUIDITY', 'REMOVE_LIQUIDITY'];
12+
export const EXCHANGE_TOKEN_EVENTS = ['MINT_EVENT', 'BURN_EVENT', 'TRANSFER_EVENT'];
1213

1314
const LAST_BLOCK_ID = process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID
1415
? Number(process.env.BACKFILL_PAIR_EVENTS_LAST_BLOCK_ID)
@@ -164,7 +165,6 @@ export async function backfillPairEvents(
164165
include: [
165166
{
166167
model: Block,
167-
as: 'block',
168168
attributes: ['height'],
169169
where: {
170170
canonical: true,
@@ -182,14 +182,22 @@ export async function backfillPairEvents(
182182
continue;
183183
}
184184

185-
const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2);
186-
console.log(
187-
`Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`,
188-
);
189-
await processPairCreationEvents(
190-
events.map(event => event.get({ plain: true })),
191-
null,
192-
);
185+
// const progressPercentage = ((processedCount / LAST_BLOCK_ID) * 100).toFixed(2);
186+
// console.log(
187+
// `Processing batch of ${events.length} events starting from offset ${processedCount} (${progressPercentage}% complete)`,
188+
// );
189+
190+
const tx = await sequelize.transaction();
191+
try {
192+
await processPairCreationEvents(
193+
events.map(event => event.get({ plain: true })),
194+
tx,
195+
);
196+
await tx.commit();
197+
} catch (error) {
198+
await tx.rollback();
199+
console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error);
200+
}
193201
processedCount += events.length;
194202

195203
const endTime = Date.now();
@@ -202,4 +210,5 @@ export async function backfillPairEvents(
202210
}
203211

204212
console.log(`Backfill completed. Processed ${processedCount} events.`);
213+
process.exit(0);
205214
}

indexer/src/services/payload.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -230,27 +230,12 @@ export async function processTransaction(
230230
);
231231

232232
const events = await Promise.all(eventsAttributes);
233-
// Note: Should not summit debug to the 'main' branch
234-
// const swapEvents = events.filter(event => event.name === 'SWAP');
235-
// console.log('swapEvents', JSON.stringify(swapEvents, null, 2));
236-
// const addLiquidityEvents = events.filter(event => event.name === 'ADD_LIQUIDITY');
237-
// console.log('addLiquidityEvents', JSON.stringify(addLiquidityEvents, null, 2));
238-
// const mintEvents = events.filter(event => event.name === 'MINT_EVENT');
239-
// console.log('mintEvents', JSON.stringify(mintEvents, null, 2));
240-
// console.log('------------------------------------- end -------------------------------');
241233
const eventsWithTransactionId = events.map(event => ({
242234
...event,
243235
transactionId,
244236
}));
245237
await Event.bulkCreate(eventsWithTransactionId, { transaction: tx });
246238

247-
// Process pair creation events
248-
try {
249-
await processPairCreationEvents(eventsWithTransactionId, tx);
250-
} catch (error) {
251-
console.error('Error processing pair creation events:', error);
252-
}
253-
254239
const signers = (cmdData.signers ?? []).map((signer: any, index: number) => ({
255240
address: signer.address,
256241
orderIndex: index,
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { rootPgPool, sequelize } from '@/config/database';
2+
import { EventAttributes } from '@/models/event';
3+
import {
4+
EVENT_TYPES,
5+
EXCHANGE_TOKEN_EVENTS,
6+
MODULE_NAMES,
7+
processPairCreationEvents,
8+
} from '@/services/pair';
9+
import { isNullOrUndefined } from '@/utils/helpers';
10+
import { Transaction as SeqTransaction } from 'sequelize';
11+
12+
let lastHeightProcessed: number | null = null;
13+
const BLOCK_DEPTH = 6;
14+
15+
export async function startPairCreation() {
16+
let pairCreationTx: SeqTransaction;
17+
let events: EventAttributes[] = [];
18+
let currentHeight: number;
19+
20+
try {
21+
const result: any = await rootPgPool.query('SELECT max(height) FROM "Blocks"');
22+
const maxHeight = result.rows?.[0].max;
23+
24+
if (isNullOrUndefined(maxHeight)) {
25+
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get max height from database');
26+
return;
27+
}
28+
29+
currentHeight = maxHeight - BLOCK_DEPTH;
30+
if (lastHeightProcessed === null) {
31+
lastHeightProcessed = currentHeight - 10;
32+
}
33+
34+
if (currentHeight - lastHeightProcessed <= 0) {
35+
console.info('[INFO][SYNC][STREAMING] No new blocks to process', {
36+
lastHeightProcessed,
37+
currentHeight,
38+
});
39+
return;
40+
}
41+
42+
const EVENT_NAMES = [...EVENT_TYPES, ...EXCHANGE_TOKEN_EVENTS];
43+
const queryParams = [lastHeightProcessed, currentHeight, EVENT_NAMES, MODULE_NAMES];
44+
const query = `
45+
SELECT
46+
e.id,
47+
e."transactionId",
48+
e."chainId",
49+
e.module,
50+
e.name,
51+
e.params,
52+
e.qualname,
53+
e.requestkey,
54+
e."orderIndex"
55+
FROM "Events" e
56+
JOIN "Transactions" t ON t.id = e."transactionId"
57+
JOIN "Blocks" b ON b.id = t."blockId"
58+
WHERE b.height > $1 AND b.height <= $2
59+
AND e."name" = ANY($3::text[])
60+
AND e."module" = ANY($4::text[])
61+
`;
62+
63+
const eventsResult = await rootPgPool.query(query, queryParams);
64+
65+
events = eventsResult.rows.map(r => {
66+
return {
67+
name: r.name,
68+
module: r.module,
69+
params: r.params,
70+
transactionId: r.transactionId,
71+
blockId: r.blockId,
72+
height: r.height,
73+
createdAt: r.createdAt,
74+
chainId: r.chainId,
75+
qualname: r.qualname,
76+
requestkey: r.requestkey,
77+
orderIndex: r.orderIndex,
78+
id: r.id,
79+
};
80+
}) as EventAttributes[];
81+
} catch (error) {
82+
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to get events:', error);
83+
return;
84+
}
85+
86+
try {
87+
pairCreationTx = await sequelize.transaction();
88+
} catch (error) {
89+
console.error('[ERROR][DATA][DATA_CORRUPT] Failed to create pair creation transaction:', error);
90+
return;
91+
}
92+
93+
try {
94+
await processPairCreationEvents(events, pairCreationTx);
95+
await pairCreationTx.commit();
96+
lastHeightProcessed = currentHeight;
97+
} catch (error) {
98+
await pairCreationTx.rollback();
99+
console.error('[ERROR][DATA][DATA_CORRUPT] Error processing pair creation events:', error);
100+
}
101+
}

indexer/src/services/streaming.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { PriceUpdaterService } from './price/price-updater.service';
2525
import { defineCanonicalBaseline } from '@/services/define-canonical';
2626
import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing';
2727
import { EventAttributes } from '@/models/event';
28+
import { startPairCreation } from '@/services/start-pair-creation';
2829

2930
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
3031
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
@@ -142,6 +143,9 @@ export async function startStreaming() {
142143

143144
processBlocks();
144145
backfillGuards();
146+
147+
// Schedule a periodic check of pair creation events every 2 minutes
148+
setInterval(startPairCreation, 1000 * 60 * 2);
145149
}
146150

147151
/**

0 commit comments

Comments
 (0)