Skip to content

Commit 57bfdda

Browse files
committed
fix: reverted orphan blocks algorithm behavior
1 parent eb3fa47 commit 57bfdda

File tree

3 files changed

+136
-46
lines changed

3 files changed

+136
-46
lines changed

indexer/src/services/define-canonical.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,37 @@ import { getRequiredEnvString } from '@/utils/helpers';
55
import { processPayload, saveBlock } from './streaming';
66
import { sequelize } from '@/config/database';
77
import { Transaction } from 'sequelize';
8+
import { BlockOutput } from '@/kadena-server/repository/application/block-repository';
89

910
const blockRepository = new BlockDbRepository();
1011
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
1112
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
1213

13-
export async function defineCanonicalBaseline(
14-
blockHash: string,
15-
parentHash: string,
16-
height: number,
17-
chainId: number,
18-
) {
14+
export async function defineCanonicalBaseline(blockHash: string) {
15+
let tipBlock: BlockOutput | null = null;
1916
try {
20-
const parentBlock = await blockRepository.getBlockByHash(parentHash);
21-
if (!parentBlock) {
22-
console.log(`[INFO][SYNC][DEFINE_CANONICAL] parentBlock not found. filling gaps ...`);
23-
console.log(`[chainId: ${chainId}, parentHash: ${parentHash}, height: ${height - 1}]`);
24-
await fillChainGapAndConfirmBlockPath(parentHash, height - 1, chainId);
25-
}
17+
tipBlock = await blockRepository.getBlockByHash(blockHash);
2618
} catch (error) {
27-
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Error filling gaps:`, error);
28-
return;
19+
console.error(`[FATAL][DB][BASELINE] There was an error with the database:`, error);
20+
process.exit(1);
2921
}
3022

31-
const tipBlock = await blockRepository.getBlockByHash(blockHash);
3223
if (!tipBlock) {
3324
// this scenario should not happen, but if it does, terminate the app.
34-
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Block ${blockHash} not found in database`);
25+
console.error(`[FATAL][DB][BASELINE] Block ${blockHash} not found in database`);
3526
process.exit(1);
3627
}
3728

3829
let tx: Transaction;
3930
try {
4031
tx = await sequelize.transaction();
4132
} catch (error) {
42-
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Failed to start transaction:`, error);
43-
throw error;
33+
console.error(
34+
`[ERROR][DB][BASELINE] Failed to start transaction for canonical algorithm:`,
35+
blockHash,
36+
error,
37+
);
38+
return;
4439
}
4540

4641
try {

indexer/src/services/missing.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { rootPgPool, sequelize } from '@/config/database';
22
import { getRequiredEnvString } from '@/utils/helpers';
33
import { processPayload, saveBlock } from './streaming';
4+
import { Transaction } from 'sequelize';
45

56
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
67
const NETWORK_ID = getRequiredEnvString('SYNC_NETWORK');
@@ -148,3 +149,77 @@ async function fillChainGaps(
148149
console.info('[INFO][SYNC][MISSING] Processed:', chainIdDiff);
149150
}
150151
}
152+
153+
export async function fillChainGapsBeforeDefiningCanonicalBaseline({
154+
chainId,
155+
lastHeight,
156+
tx,
157+
}: {
158+
chainId: number;
159+
lastHeight: number;
160+
tx: Transaction;
161+
}): Promise<void> {
162+
try {
163+
console.info('[INFO][SYNC][MISSING] Filling initial chain gaps:', chainId);
164+
const cutUrl = `${SYNC_BASE_URL}/${NETWORK_ID}/cut`;
165+
const cutRes = await fetch(cutUrl, {
166+
method: 'GET',
167+
headers: {
168+
'Content-Type': 'application/json',
169+
},
170+
});
171+
172+
const cutData = await cutRes.json();
173+
174+
const chainsAndHashes = Object.keys(cutData.hashes).map(chainId => ({
175+
chainId,
176+
hash: cutData.hashes[chainId].hash,
177+
}));
178+
179+
const dbQuery = `
180+
SELECT MAX(height) as height
181+
FROM "Blocks"
182+
WHERE "chainId" = $1
183+
`;
184+
185+
const { rows } = await rootPgPool.query(dbQuery, [chainId]);
186+
187+
const fromHeight = rows[0].height + 1;
188+
const toHeight = lastHeight - 1;
189+
190+
if (fromHeight > toHeight) {
191+
console.info(`[INFO][SYNC][MISSING] No gaps to fill for chain ${chainId}`);
192+
return;
193+
}
194+
195+
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainId}/block/branch?minheight=${fromHeight}&maxheight=${toHeight}`;
196+
197+
const res = await fetch(url, {
198+
method: 'POST',
199+
headers: {
200+
'Content-Type': 'application/json',
201+
Accept: 'application/json',
202+
},
203+
body: JSON.stringify({
204+
upper: [chainsAndHashes[chainId].hash],
205+
}),
206+
});
207+
208+
const data = await res.json();
209+
210+
const promises = data.items.map(async (item: any) => {
211+
const payload = processPayload(item.payloadWithOutputs);
212+
return saveBlock({ header: item.header, payload, canonical: true }, tx);
213+
});
214+
215+
await Promise.all(promises);
216+
217+
console.info(`[INFO][SYNC][MISSING] Initial chain gaps filled:`, chainId, fromHeight, toHeight);
218+
} catch (error) {
219+
console.error(
220+
`[FATAL][SYNC][MISSING] Error filling chain ${chainId} gaps before defining canonical baseline:`,
221+
error,
222+
);
223+
process.exit(1);
224+
}
225+
}

indexer/src/services/streaming.ts

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import { backfillGuards } from './guards';
2323
import { Transaction } from 'sequelize';
2424
import { PriceUpdaterService } from './price/price-updater.service';
2525
import { defineCanonicalBaseline } from '@/services/define-canonical';
26-
import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing';
26+
import {
27+
fillChainGapsBeforeDefiningCanonicalBaseline,
28+
startMissingBlocksBeforeStreamingProcess,
29+
} from '@/services/missing';
2730
import { EventAttributes } from '@/models/event';
2831
import { startPairCreation } from '@/services/start-pair-creation';
2932

@@ -53,10 +56,7 @@ export async function startStreaming() {
5356
await startMissingBlocksBeforeStreamingProcess();
5457

5558
const nextBlocksToProcess: any[] = [];
56-
const blocksRecentlyProcessed = new Set<string>();
57-
58-
// Initialize price updater
59-
PriceUpdaterService.getInstance();
59+
const initialChainGapsAlreadyFilled = new Set<number>();
6060

6161
// Initialize EventSource connection to the blockchain node
6262
const eventSource = new EventSource(`${SYNC_BASE_URL}/${SYNC_NETWORK}/block/updates`);
@@ -77,35 +77,62 @@ export async function startStreaming() {
7777
};
7878

7979
const processBlock = async (block: any) => {
80-
const blockIdentifier = block.header.hash;
81-
82-
if (blocksRecentlyProcessed.has(blockIdentifier)) {
83-
await defineCanonicalBaseline(
84-
block.header.hash,
85-
block.header.parent,
86-
block.header.height,
87-
block.header.chainId,
80+
const blockHash = block.header.hash;
81+
82+
let blockInDatabase: Block | null = null;
83+
try {
84+
blockInDatabase = await Block.findOne({ where: { hash: blockHash } });
85+
} catch (error) {
86+
console.error(
87+
'[FATAL][DB][STREAMING] There was an error with the database:',
88+
blockHash,
89+
error,
90+
);
91+
process.exit(1);
92+
}
93+
94+
if (blockInDatabase) {
95+
await defineCanonicalBaseline(block.header.hash);
96+
return;
97+
}
98+
99+
let tx: Transaction;
100+
try {
101+
tx = await sequelize.transaction();
102+
} catch (error) {
103+
console.error(
104+
'[ERROR][DB][STREAMING] Failed to start transaction for new block:',
105+
blockHash,
106+
error,
88107
);
89108
return;
90109
}
91110

92-
const tx = await sequelize.transaction();
93111
try {
94112
const payload = processPayload(block.payloadWithOutputs);
95113
await saveBlock({ header: block.header, payload, canonical: null }, tx);
114+
115+
if (!initialChainGapsAlreadyFilled.has(block.header.chainId)) {
116+
initialChainGapsAlreadyFilled.add(block.header.chainId);
117+
await fillChainGapsBeforeDefiningCanonicalBaseline({
118+
chainId: block.header.chainId,
119+
lastHeight: block.header.height,
120+
tx,
121+
});
122+
}
123+
96124
await tx.commit();
97-
blocksRecentlyProcessed.add(blockIdentifier);
98125
} catch (error) {
99126
await tx.rollback();
127+
console.error(
128+
'[ERROR][DB][STREAMING] Failed to start transaction for new block:',
129+
blockHash,
130+
error,
131+
);
100132
return;
101133
}
102134

103-
await defineCanonicalBaseline(
104-
block.header.hash,
105-
block.header.parent,
106-
block.header.height,
107-
block.header.chainId,
108-
);
135+
await defineCanonicalBaseline(block.header.hash);
109136
};
110137

111138
const processBlocks = async () => {
@@ -134,13 +161,6 @@ export async function startStreaming() {
134161
setTimeout(processBlocks, 1000);
135162
};
136163

137-
setInterval(
138-
() => {
139-
blocksRecentlyProcessed.clear();
140-
},
141-
1000 * 60 * 60 * 1, // 1 hour
142-
);
143-
144164
processBlocks();
145165
backfillGuards();
146166

0 commit comments

Comments
 (0)