Skip to content

Commit 4a5c779

Browse files
authored
Merge pull request #215 from hack-a-chain-software/missing-blocks
fix: missing blocks
2 parents 447332b + bb65349 commit 4a5c779

File tree

3 files changed

+114
-8
lines changed

3 files changed

+114
-8
lines changed

indexer/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { closeDatabase } from './config/database';
1010
import { initializeDatabase } from './config/init';
1111
import { startBackfillCoinbaseTransactions } from './services/sync/coinbase';
1212
import { backfillBalances } from './services/sync/balances';
13+
import { startMissingBlocks } from './services/sync/missing';
1314

1415
program
1516
.option('-s, --streaming', 'Start streaming blockchain data')
@@ -18,6 +19,7 @@ program
1819
.option('-f, --guards', 'Backfill the guards')
1920
// this option shouldn't be used if you initialize the indexer from the beginning
2021
.option('-c, --coinbase', 'Backfill coinbase transactions')
22+
.option('-m, --missing', 'Missing blocks')
2123
.option('-z, --database', 'Init the database');
2224

2325
program.parse(process.argv);
@@ -44,6 +46,9 @@ async function main() {
4446
process.exit(0);
4547
} else if (options.coinbase) {
4648
await startBackfillCoinbaseTransactions();
49+
} else if (options.missing) {
50+
await startMissingBlocks();
51+
process.exit(0);
4752
} else if (options.oldGraphql) {
4853
await usePostgraphile();
4954
} else if (options.graphql) {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { rootPgPool, sequelize } from '../../config/database';
2+
import { getRequiredEnvString } from '../../utils/helpers';
3+
import { processPayload, saveBlock } from './streaming';
4+
5+
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
6+
const NETWORK_ID = getRequiredEnvString('SYNC_NETWORK');
7+
8+
export async function startMissingBlocks() {
9+
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/cut`;
10+
const res = await fetch(url, {
11+
method: 'GET',
12+
headers: {
13+
'Content-Type': 'application/json',
14+
},
15+
});
16+
17+
const data = await res.json();
18+
19+
const chainsAndHashes = Object.keys(data.hashes).map(chainId => ({
20+
chainId,
21+
hash: data.hashes[chainId].hash,
22+
}));
23+
24+
const query = `
25+
SELECT DISTINCT
26+
b1."chainId",
27+
b1."chainwebVersion",
28+
b1.height + 1 AS from_height,
29+
MIN(b2.height) - 1 AS to_height,
30+
(MIN(b2.height) - b1.height - 1) AS diff
31+
FROM "Blocks" b1
32+
JOIN "Blocks" b2
33+
ON b1."chainId" = b2."chainId"
34+
AND b1."chainwebVersion" = b2."chainwebVersion"
35+
AND b2.height > b1.height
36+
WHERE b1."chainId" = $1
37+
AND NOT EXISTS (
38+
SELECT 1
39+
FROM "Blocks" b3
40+
WHERE b3."chainId" = b1."chainId"
41+
AND b3."chainwebVersion" = b1."chainwebVersion"
42+
AND b3.height = b1.height + 1
43+
)
44+
GROUP BY b1."chainId", b1."chainwebVersion", b1.height
45+
ORDER BY b1."chainId", b1."chainwebVersion", from_height;
46+
`;
47+
48+
for (let i = 0; i < chainsAndHashes.length; i += 1) {
49+
const chainAndHash = chainsAndHashes[i];
50+
const { rows } = await rootPgPool.query(query, [chainAndHash.chainId]);
51+
52+
for (const row of rows) {
53+
console.log('Row:', row);
54+
const THRESHOLD = 100;
55+
const totalHeightRange = row.to_height - row.from_height + 1;
56+
let processedHeight = 0;
57+
58+
console.log('Starting:', chainAndHash.chainId, row.from_height, row.to_height);
59+
for (let i = row.from_height; i <= row.to_height; i += THRESHOLD) {
60+
let minHeight = i;
61+
let maxHeight = Math.min(i + THRESHOLD - 1, row.to_height);
62+
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainAndHash.chainId}/block/branch?minheight=${minHeight}&maxheight=${maxHeight}`;
63+
64+
const res = await fetch(url, {
65+
method: 'POST',
66+
headers: {
67+
'Content-Type': 'application/json',
68+
Accept: 'application/json',
69+
},
70+
body: JSON.stringify({
71+
upper: [chainAndHash.hash],
72+
}),
73+
});
74+
75+
const data = await res.json();
76+
77+
const tx = await sequelize.transaction();
78+
try {
79+
const promises = data.items.map(async (item: any) => {
80+
const payload = processPayload(item.payloadWithOutputs);
81+
return saveBlock({ header: item.header, payload }, tx);
82+
});
83+
84+
await Promise.all(promises);
85+
await tx.commit();
86+
} catch (err) {
87+
await tx.rollback();
88+
throw err;
89+
}
90+
91+
processedHeight += maxHeight - minHeight + 1;
92+
const progress = Math.min((processedHeight / totalHeightRange) * 100, 100).toFixed(2);
93+
94+
console.log(`Chain ${chainAndHash.chainId}: ${progress}%`);
95+
}
96+
97+
console.log('Processed:', chainAndHash);
98+
}
99+
}
100+
}

indexer/src/services/sync/streaming.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import Block, { BlockAttributes } from '../../models/block';
77
import { sequelize } from '../../config/database';
88
import StreamingError from '../../models/streaming-error';
99
import { backfillGuards } from './guards';
10+
import { Transaction } from 'sequelize';
1011

1112
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
1213
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
@@ -30,16 +31,20 @@ export async function startStreaming() {
3031
return;
3132
}
3233
blocksAlreadyReceived.add(block.header.hash);
33-
const blockData = await saveBlock({ header: block.header, payload });
34+
35+
const tx = await sequelize.transaction();
36+
const blockData = await saveBlock({ header: block.header, payload }, tx);
3437
if (blockData === null) {
3538
await StreamingError.create({
3639
hash: block.header.hash,
3740
chainId: block.header.chainId,
3841
});
42+
await tx.rollback();
3943
return;
4044
}
45+
await tx.commit();
4146
} catch (error) {
42-
console.log(error);
47+
console.log('Event Error:', error);
4348
}
4449
});
4550

@@ -55,7 +60,7 @@ export async function startStreaming() {
5560
setInterval(backfillGuards, 1000 * 60 * 60); // every one hour
5661
}
5762

58-
function processPayload(payload: any) {
63+
export function processPayload(payload: any) {
5964
const transactions = payload.transactions;
6065
transactions.forEach((transaction: any) => {
6166
transaction[0] = getDecoded(transaction[0]);
@@ -79,13 +84,11 @@ function processPayload(payload: any) {
7984
return payloadData;
8085
}
8186

82-
async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
87+
export async function saveBlock(parsedData: any, tx?: Transaction): Promise<DispatchInfo | null> {
8388
const headerData = parsedData.header;
8489
const payloadData = parsedData.payload;
8590
const transactions = payloadData.transactions || [];
8691

87-
const tx = await sequelize.transaction();
88-
8992
try {
9093
const blockAttribute = {
9194
nonce: headerData.nonce,
@@ -120,7 +123,6 @@ async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
120123
eventsCreated.map(t => `${t.module}.${t.name}`).filter(Boolean),
121124
);
122125

123-
await tx.commit();
124126
return {
125127
hash: createdBlock.hash,
126128
chainId: createdBlock.chainId.toString(),
@@ -129,7 +131,6 @@ async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
129131
qualifiedEventNames: Array.from(uniqueQualifiedEventNames),
130132
};
131133
} catch (error) {
132-
await tx.rollback();
133134
console.error(`Error saving block to the database: ${error}`);
134135
return null;
135136
}

0 commit comments

Comments
 (0)