Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { closeDatabase } from './config/database';
import { initializeDatabase } from './config/init';
import { startBackfillCoinbaseTransactions } from './services/sync/coinbase';
import { backfillBalances } from './services/sync/balances';
import { startMissingBlocks } from './services/sync/missing';

program
.option('-s, --streaming', 'Start streaming blockchain data')
Expand All @@ -18,6 +19,7 @@ program
.option('-f, --guards', 'Backfill the guards')
// this option shouldn't be used if you initialize the indexer from the beginning
.option('-c, --coinbase', 'Backfill coinbase transactions')
.option('-m, --missing', 'Missing blocks')
.option('-z, --database', 'Init the database');

program.parse(process.argv);
Expand All @@ -44,6 +46,9 @@ async function main() {
process.exit(0);
} else if (options.coinbase) {
await startBackfillCoinbaseTransactions();
} else if (options.missing) {
await startMissingBlocks();
process.exit(0);
} else if (options.oldGraphql) {
await usePostgraphile();
} else if (options.graphql) {
Expand Down
100 changes: 100 additions & 0 deletions indexer/src/services/sync/missing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { rootPgPool, sequelize } from '../../config/database';
import { getRequiredEnvString } from '../../utils/helpers';
import { processPayload, saveBlock } from './streaming';

const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
const NETWORK_ID = getRequiredEnvString('SYNC_NETWORK');

export async function startMissingBlocks() {
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/cut`;
const res = await fetch(url, {
method: 'GET',
headers: {
'Content-Type': 'application/json',
},
});

const data = await res.json();

const chainsAndHashes = Object.keys(data.hashes).map(chainId => ({
chainId,
hash: data.hashes[chainId].hash,
}));

const query = `
SELECT DISTINCT
b1."chainId",
b1."chainwebVersion",
b1.height + 1 AS from_height,
MIN(b2.height) - 1 AS to_height,
(MIN(b2.height) - b1.height - 1) AS diff
FROM "Blocks" b1
JOIN "Blocks" b2
ON b1."chainId" = b2."chainId"
AND b1."chainwebVersion" = b2."chainwebVersion"
AND b2.height > b1.height
WHERE b1."chainId" = $1
AND NOT EXISTS (
SELECT 1
FROM "Blocks" b3
WHERE b3."chainId" = b1."chainId"
AND b3."chainwebVersion" = b1."chainwebVersion"
AND b3.height = b1.height + 1
)
GROUP BY b1."chainId", b1."chainwebVersion", b1.height
ORDER BY b1."chainId", b1."chainwebVersion", from_height;
`;

for (let i = 0; i < chainsAndHashes.length; i += 1) {
const chainAndHash = chainsAndHashes[i];
const { rows } = await rootPgPool.query(query, [chainAndHash.chainId]);

for (const row of rows) {
console.log('Row:', row);
const THRESHOLD = 100;
const totalHeightRange = row.to_height - row.from_height + 1;
let processedHeight = 0;

console.log('Starting:', chainAndHash.chainId, row.from_height, row.to_height);
for (let i = row.from_height; i <= row.to_height; i += THRESHOLD) {
let minHeight = i;
let maxHeight = Math.min(i + THRESHOLD - 1, row.to_height);
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainAndHash.chainId}/block/branch?minheight=${minHeight}&maxheight=${maxHeight}`;

const res = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'application/json',
},
body: JSON.stringify({
upper: [chainAndHash.hash],
}),
});

const data = await res.json();

const tx = await sequelize.transaction();
try {
const promises = data.items.map(async (item: any) => {
const payload = processPayload(item.payloadWithOutputs);
return saveBlock({ header: item.header, payload }, tx);
});

await Promise.all(promises);
await tx.commit();
} catch (err) {
await tx.rollback();
throw err;
}

processedHeight += maxHeight - minHeight + 1;
const progress = Math.min((processedHeight / totalHeightRange) * 100, 100).toFixed(2);

console.log(`Chain ${chainAndHash.chainId}: ${progress}%`);
}

console.log('Processed:', chainAndHash);
}
}
}
17 changes: 9 additions & 8 deletions indexer/src/services/sync/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Block, { BlockAttributes } from '../../models/block';
import { sequelize } from '../../config/database';
import StreamingError from '../../models/streaming-error';
import { backfillGuards } from './guards';
import { Transaction } from 'sequelize';

const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
Expand All @@ -30,16 +31,20 @@ export async function startStreaming() {
return;
}
blocksAlreadyReceived.add(block.header.hash);
const blockData = await saveBlock({ header: block.header, payload });

const tx = await sequelize.transaction();
const blockData = await saveBlock({ header: block.header, payload }, tx);
if (blockData === null) {
await StreamingError.create({
hash: block.header.hash,
chainId: block.header.chainId,
});
await tx.rollback();
return;
}
await tx.commit();
} catch (error) {
console.log(error);
console.log('Event Error:', error);
}
});

Expand All @@ -55,7 +60,7 @@ export async function startStreaming() {
setInterval(backfillGuards, 1000 * 60 * 60); // every one hour
}

function processPayload(payload: any) {
export function processPayload(payload: any) {
const transactions = payload.transactions;
transactions.forEach((transaction: any) => {
transaction[0] = getDecoded(transaction[0]);
Expand All @@ -79,13 +84,11 @@ function processPayload(payload: any) {
return payloadData;
}

async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
export async function saveBlock(parsedData: any, tx?: Transaction): Promise<DispatchInfo | null> {
const headerData = parsedData.header;
const payloadData = parsedData.payload;
const transactions = payloadData.transactions || [];

const tx = await sequelize.transaction();

try {
const blockAttribute = {
nonce: headerData.nonce,
Expand Down Expand Up @@ -120,7 +123,6 @@ async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
eventsCreated.map(t => `${t.module}.${t.name}`).filter(Boolean),
);

await tx.commit();
return {
hash: createdBlock.hash,
chainId: createdBlock.chainId.toString(),
Expand All @@ -129,7 +131,6 @@ async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
qualifiedEventNames: Array.from(uniqueQualifiedEventNames),
};
} catch (error) {
await tx.rollback();
console.error(`Error saving block to the database: ${error}`);
return null;
}
Expand Down