Skip to content

Commit 4f7bde7

Browse files
committed
fix: reverted orphan blocks algorithm behavior
1 parent c57902d commit 4f7bde7

File tree

3 files changed

+64
-173
lines changed

3 files changed

+64
-173
lines changed

indexer/src/services/define-canonical.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,37 @@ import { getRequiredEnvString } from '@/utils/helpers';
55
import { processPayload, saveBlock } from './streaming';
66
import { sequelize } from '@/config/database';
77
import { Transaction } from 'sequelize';
8+
import { BlockOutput } from '@/kadena-server/repository/application/block-repository';
89

910
const blockRepository = new BlockDbRepository();
1011
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
1112
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
1213

13-
export async function defineCanonicalBaseline(
14-
blockHash: string,
15-
parentHash: string,
16-
height: number,
17-
chainId: number,
18-
) {
14+
export async function defineCanonicalBaseline(blockHash: string) {
15+
let tipBlock: BlockOutput | null = null;
1916
try {
20-
const parentBlock = await blockRepository.getBlockByHash(parentHash);
21-
if (!parentBlock) {
22-
console.log(`[INFO][SYNC][DEFINE_CANONICAL] parentBlock not found. filling gaps ...`);
23-
console.log(`[chainId: ${chainId}, parentHash: ${parentHash}, height: ${height - 1}]`);
24-
await fillChainGapAndConfirmBlockPath(parentHash, height - 1, chainId);
25-
}
17+
tipBlock = await blockRepository.getBlockByHash(blockHash);
2618
} catch (error) {
27-
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Error filling gaps:`, error);
28-
return;
19+
console.error(`[FATAL][DB][BASELINE] There was an error with the database:`, error);
20+
process.exit(1);
2921
}
3022

31-
const tipBlock = await blockRepository.getBlockByHash(blockHash);
3223
if (!tipBlock) {
3324
// this scenario should not happen, but if it does, terminate the app.
34-
console.error(`[ERROR][DB][DEFINE_CANONICAL] Block ${blockHash} not found in database`);
25+
console.error(`[FATAL][DB][BASELINE] Block ${blockHash} not found in database`);
3526
process.exit(1);
3627
}
3728

3829
let tx: Transaction;
3930
try {
4031
tx = await sequelize.transaction();
4132
} catch (error) {
42-
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Failed to start transaction:`, error);
43-
throw error;
33+
console.error(
34+
`[ERROR][DB][BASELINE] Failed to start transaction for canonical algorithm:`,
35+
blockHash,
36+
error,
37+
);
38+
return;
4439
}
4540

4641
try {

indexer/src/services/missing.ts

Lines changed: 8 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { rootPgPool, sequelize } from '@/config/database';
22
import { getRequiredEnvString } from '@/utils/helpers';
33
import { processPayload, saveBlock } from './streaming';
4+
import { Transaction } from 'sequelize';
45

56
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
67
const NETWORK_ID = getRequiredEnvString('SYNC_NETWORK');
@@ -185,6 +186,11 @@ export async function fillChainGapsBeforeDefiningCanonicalBaseline({
185186
const fromHeight = rows[0].height + 1;
186187
const toHeight = lastHeight - 1;
187188

189+
if (fromHeight > toHeight) {
190+
console.info(`[INFO][SYNC][MISSING] No gaps to fill for chain ${chainId}`);
191+
return;
192+
}
193+
188194
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainId}/block/branch?minheight=${fromHeight}&maxheight=${toHeight}`;
189195

190196
const res = await fetch(url, {
@@ -210,132 +216,9 @@ export async function fillChainGapsBeforeDefiningCanonicalBaseline({
210216
console.info(`[INFO][SYNC][MISSING] Initial chain gaps filled:`, chainId, fromHeight, toHeight);
211217
} catch (error) {
212218
console.error(
213-
`[ERROR][SYNC][SYNC_TIMEOUT] Error filling chain ${chainId} gaps before defining canonical baseline:`,
214-
error,
215-
);
216-
}
217-
}
218-
219-
export async function checkCanonicalPathForAllChains() {
220-
const chainsSynced = [];
221-
const chains = Array.from({ length: 20 }, (_, i) => i.toString());
222-
223-
try {
224-
for (const chainId of chains) {
225-
const query = `
226-
SELECT hash
227-
FROM "Blocks"
228-
WHERE "chainId" = $1 AND height = (SELECT MAX(height) FROM "Blocks" WHERE "chainId" = $1)
229-
`;
230-
const { rows } = await rootPgPool.query(query, [chainId]);
231-
const blockHash = rows?.[0]?.hash;
232-
const isSynced = await checkCanonicalPathStartingFromSpecificBlock(blockHash);
233-
234-
if (isSynced) {
235-
chainsSynced.push(chainId);
236-
}
237-
}
238-
239-
if (chainsSynced.length === 0) {
240-
console.info('[INFO][SYNC][MISSING] No chains to sync');
241-
} else {
242-
console.info(
243-
`[INFO][SYNC][MISSING] Successfully synced ${chainsSynced.length} chains: ${chainsSynced.join(
244-
', ',
245-
)}`,
246-
);
247-
}
248-
} catch (error) {
249-
console.error(
250-
`[ERROR][SYNC][SYNC_TIMEOUT] Error checking canonical path for all chains:`,
219+
`[FATAL][SYNC][MISSING] Error filling chain ${chainId} gaps before defining canonical baseline:`,
251220
error,
252221
);
253-
}
254-
}
255-
256-
async function checkCanonicalPathStartingFromSpecificBlock(
257-
blockHash: string,
258-
maxAttempts = 20,
259-
): Promise<boolean> {
260-
let attempts = 0;
261-
let ancestors = await findCanonicalBaseline(blockHash);
262-
263-
while (ancestors.length < CANONICAL_BASE_LINE_LENGTH && attempts < maxAttempts) {
264-
console.info(
265-
`[INFO][SYNC][MISSING] Attempt ${attempts + 1}: Found ${ancestors.length} blocks, need ${CANONICAL_BASE_LINE_LENGTH}`,
266-
);
267-
268-
// Get the lowest block we have
269-
const lowestBlock = ancestors[ancestors.length - 1];
270-
271-
// Fetch and save the parent block
272-
await fetchAndSaveBlocks(lowestBlock.chainId, lowestBlock.height - 1);
273-
274-
// Recalculate ancestors
275-
ancestors = await findCanonicalBaseline(blockHash);
276-
attempts++;
277-
}
278-
279-
if (ancestors.length < CANONICAL_BASE_LINE_LENGTH) {
280-
throw new Error(
281-
`[ERROR][SYNC][SYNC_TIMEOUT] Failed to build complete canonical path after ${maxAttempts} attempts. Only found ${ancestors.length} blocks.`,
282-
);
283-
}
284-
285-
return attempts > 0;
286-
}
287-
288-
async function findCanonicalBaseline(hash: string) {
289-
const query = `
290-
WITH RECURSIVE BlockAncestors AS (
291-
SELECT hash, parent, 1 AS depth, height, "chainId"
292-
FROM "Blocks"
293-
WHERE hash = $1
294-
UNION ALL
295-
SELECT b.hash, b.parent, d.depth + 1 AS depth, b.height, b."chainId"
296-
FROM BlockAncestors d
297-
JOIN "Blocks" b ON d.parent = b.hash
298-
WHERE d.depth < $2
299-
)
300-
SELECT parent as hash, height, "chainId"
301-
FROM BlockAncestors
302-
ORDER BY height DESC
303-
`;
304-
305-
const { rows } = await rootPgPool.query(query, [hash, CANONICAL_BASE_LINE_LENGTH]);
306-
return rows;
307-
}
308-
309-
async function fetchAndSaveBlocks(chainId: number, height: number) {
310-
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainId}/block?minheight=${height}&maxheight=${height}`;
311-
const res = await fetch(url, {
312-
method: 'GET',
313-
headers: {
314-
'Content-Type': 'application/json',
315-
},
316-
});
317-
318-
const data = await res.json();
319-
320-
const tx = await sequelize.transaction();
321-
try {
322-
const promises = data.items.map(async (item: any) => {
323-
const payload = processPayload(item.payloadWithOutputs);
324-
const block = await Block.findOne({ where: { hash: item.header.hash } });
325-
if (block) {
326-
return Promise.resolve();
327-
} else {
328-
return saveBlock({ header: item.header, payload, canonical: true }, tx);
329-
}
330-
});
331-
332-
await Promise.all(promises);
333-
await tx.commit();
334-
console.info(
335-
`[INFO][SYNC][MISSING] Successfully synced blocks at height ${height} for chain ${chainId}`,
336-
);
337-
} catch (err) {
338-
await tx.rollback();
339-
throw err;
222+
process.exit(1);
340223
}
341224
}

indexer/src/services/streaming.ts

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import Block, { BlockAttributes } from '@/models/block';
2121
import { sequelize } from '@/config/database';
2222
import { backfillGuards } from './guards';
2323
import { Transaction } from 'sequelize';
24-
import { PriceUpdaterService } from './price/price-updater.service';
2524
import { defineCanonicalBaseline } from '@/services/define-canonical';
26-
import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing';
25+
import {
26+
fillChainGapsBeforeDefiningCanonicalBaseline,
27+
startMissingBlocksBeforeStreamingProcess,
28+
} from '@/services/missing';
2729
import { EventAttributes } from '@/models/event';
2830
import { startPairCreation } from '@/services/start-pair-creation';
2931

@@ -53,10 +55,7 @@ export async function startStreaming() {
5355
await startMissingBlocksBeforeStreamingProcess();
5456

5557
const nextBlocksToProcess: any[] = [];
56-
const blocksRecentlyProcessed = new Set<string>();
57-
58-
// Initialize price updater
59-
PriceUpdaterService.getInstance();
58+
const initialChainGapsAlreadyFilled = new Set<number>();
6059

6160
// Initialize EventSource connection to the blockchain node
6261
const eventSource = new EventSource(`${SYNC_BASE_URL}/${SYNC_NETWORK}/block/updates`);
@@ -78,27 +77,54 @@ export async function startStreaming() {
7877
};
7978

8079
const processBlock = async (block: any) => {
81-
const blockIdentifier = block.header.hash;
82-
83-
if (blocksRecentlyProcessed.has(blockIdentifier)) {
84-
await defineCanonicalBaseline(
85-
block.header.hash,
86-
block.header.parent,
87-
block.header.height,
88-
block.header.chainId,
80+
const blockHash = block.header.hash;
81+
82+
let blockInDatabase: Block | null = null;
83+
try {
84+
blockInDatabase = await Block.findOne({ where: { hash: blockHash } });
85+
} catch (error) {
86+
console.error(
87+
'[ERROR][DB][STREAMING] There was an error with the database:',
88+
blockHash,
89+
error,
90+
);
91+
process.exit(1);
92+
}
93+
94+
if (blockInDatabase) {
95+
await defineCanonicalBaseline(block.header.hash);
96+
return;
97+
}
98+
99+
let tx: Transaction;
100+
try {
101+
tx = await sequelize.transaction();
102+
} catch (error) {
103+
console.error(
104+
'[ERROR][DB][STREAMING] Failed to start transaction for new block:',
105+
blockHash,
106+
error,
89107
);
90108
return;
91109
}
92110

93-
const tx = await sequelize.transaction();
94111
try {
95112
const payload = processPayload(block.payloadWithOutputs);
96113

97114
// Save the block data and process its transactions
98115
// TODO: [CONSISTENCY] Validate saveBlock result; if null/failed, handle with rollback + DLQ + metric to avoid partial commits
99116
await saveBlock({ header: block.header, payload, canonical: null }, tx);
117+
118+
if (!initialChainGapsAlreadyFilled.has(block.header.chainId)) {
119+
initialChainGapsAlreadyFilled.add(block.header.chainId);
120+
await fillChainGapsBeforeDefiningCanonicalBaseline({
121+
chainId: block.header.chainId,
122+
lastHeight: block.header.height,
123+
tx,
124+
});
125+
}
126+
100127
await tx.commit();
101-
blocksRecentlyProcessed.add(blockIdentifier);
102128
} catch (error) {
103129
await tx.rollback();
104130
// TODO: [OBS][METRICS] Increment 'stream.block_failures' with tags { chainId, reason: 'processing' }
@@ -112,12 +138,7 @@ export async function startStreaming() {
112138
return;
113139
}
114140

115-
await defineCanonicalBaseline(
116-
block.header.hash,
117-
block.header.parent,
118-
block.header.height,
119-
block.header.chainId,
120-
);
141+
await defineCanonicalBaseline(block.header.hash);
121142
};
122143

123144
const processBlocks = async () => {
@@ -146,14 +167,6 @@ export async function startStreaming() {
146167
setTimeout(processBlocks, 1000);
147168
};
148169

149-
setInterval(
150-
() => {
151-
blocksRecentlyProcessed.clear();
152-
console.info('[INFO][SYNC][STREAMING] blocksRecentlyProcessed cleared');
153-
},
154-
1000 * 60 * 60 * 1, // 1 hour
155-
);
156-
157170
processBlocks();
158171
backfillGuards();
159172

0 commit comments

Comments
 (0)