Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions indexer/src/services/define-canonical.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,37 @@ import { getRequiredEnvString } from '@/utils/helpers';
import { processPayload, saveBlock } from './streaming';
import { sequelize } from '@/config/database';
import { Transaction } from 'sequelize';
import { BlockOutput } from '@/kadena-server/repository/application/block-repository';

const blockRepository = new BlockDbRepository();
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');

export async function defineCanonicalBaseline(
blockHash: string,
parentHash: string,
height: number,
chainId: number,
) {
export async function defineCanonicalBaseline(blockHash: string) {
let tipBlock: BlockOutput | null = null;
try {
const parentBlock = await blockRepository.getBlockByHash(parentHash);
if (!parentBlock) {
console.log(`[INFO][SYNC][DEFINE_CANONICAL] parentBlock not found. filling gaps ...`);
console.log(`[chainId: ${chainId}, parentHash: ${parentHash}, height: ${height - 1}]`);
await fillChainGapAndConfirmBlockPath(parentHash, height - 1, chainId);
}
tipBlock = await blockRepository.getBlockByHash(blockHash);
} catch (error) {
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Error filling gaps:`, error);
return;
console.error(`[FATAL][DB][BASELINE] There was an error with the database:`, error);
process.exit(1);
}

const tipBlock = await blockRepository.getBlockByHash(blockHash);
if (!tipBlock) {
// this scenario should not happen, but if it does, terminate the app.
console.error(`[ERROR][DB][DEFINE_CANONICAL] Block ${blockHash} not found in database`);
console.error(`[FATAL][DB][BASELINE] Block ${blockHash} not found in database`);
process.exit(1);
}

let tx: Transaction;
try {
tx = await sequelize.transaction();
} catch (error) {
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Failed to start transaction:`, error);
throw error;
console.error(
`[ERROR][DB][BASELINE] Failed to start transaction for canonical algorithm:`,
blockHash,
error,
);
return;
}

try {
Expand Down
133 changes: 8 additions & 125 deletions indexer/src/services/missing.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { rootPgPool, sequelize } from '@/config/database';
import { getRequiredEnvString } from '@/utils/helpers';
import { processPayload, saveBlock } from './streaming';
import { Transaction } from 'sequelize';

const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
const NETWORK_ID = getRequiredEnvString('SYNC_NETWORK');
Expand Down Expand Up @@ -185,6 +186,11 @@ export async function fillChainGapsBeforeDefiningCanonicalBaseline({
const fromHeight = rows[0].height + 1;
const toHeight = lastHeight - 1;

if (fromHeight > toHeight) {
console.info(`[INFO][SYNC][MISSING] No gaps to fill for chain ${chainId}`);
return;
}

const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainId}/block/branch?minheight=${fromHeight}&maxheight=${toHeight}`;

const res = await fetch(url, {
Expand All @@ -210,132 +216,9 @@ export async function fillChainGapsBeforeDefiningCanonicalBaseline({
console.info(`[INFO][SYNC][MISSING] Initial chain gaps filled:`, chainId, fromHeight, toHeight);
} catch (error) {
console.error(
`[ERROR][SYNC][SYNC_TIMEOUT] Error filling chain ${chainId} gaps before defining canonical baseline:`,
error,
);
}
}

export async function checkCanonicalPathForAllChains() {
const chainsSynced = [];
const chains = Array.from({ length: 20 }, (_, i) => i.toString());

try {
for (const chainId of chains) {
const query = `
SELECT hash
FROM "Blocks"
WHERE "chainId" = $1 AND height = (SELECT MAX(height) FROM "Blocks" WHERE "chainId" = $1)
`;
const { rows } = await rootPgPool.query(query, [chainId]);
const blockHash = rows?.[0]?.hash;
const isSynced = await checkCanonicalPathStartingFromSpecificBlock(blockHash);

if (isSynced) {
chainsSynced.push(chainId);
}
}

if (chainsSynced.length === 0) {
console.info('[INFO][SYNC][MISSING] No chains to sync');
} else {
console.info(
`[INFO][SYNC][MISSING] Successfully synced ${chainsSynced.length} chains: ${chainsSynced.join(
', ',
)}`,
);
}
} catch (error) {
console.error(
`[ERROR][SYNC][SYNC_TIMEOUT] Error checking canonical path for all chains:`,
`[FATAL][SYNC][MISSING] Error filling chain ${chainId} gaps before defining canonical baseline:`,
error,
);
}
}

async function checkCanonicalPathStartingFromSpecificBlock(
blockHash: string,
maxAttempts = 20,
): Promise<boolean> {
let attempts = 0;
let ancestors = await findCanonicalBaseline(blockHash);

while (ancestors.length < CANONICAL_BASE_LINE_LENGTH && attempts < maxAttempts) {
console.info(
`[INFO][SYNC][MISSING] Attempt ${attempts + 1}: Found ${ancestors.length} blocks, need ${CANONICAL_BASE_LINE_LENGTH}`,
);

// Get the lowest block we have
const lowestBlock = ancestors[ancestors.length - 1];

// Fetch and save the parent block
await fetchAndSaveBlocks(lowestBlock.chainId, lowestBlock.height - 1);

// Recalculate ancestors
ancestors = await findCanonicalBaseline(blockHash);
attempts++;
}

if (ancestors.length < CANONICAL_BASE_LINE_LENGTH) {
throw new Error(
`[ERROR][SYNC][SYNC_TIMEOUT] Failed to build complete canonical path after ${maxAttempts} attempts. Only found ${ancestors.length} blocks.`,
);
}

return attempts > 0;
}

async function findCanonicalBaseline(hash: string) {
const query = `
WITH RECURSIVE BlockAncestors AS (
SELECT hash, parent, 1 AS depth, height, "chainId"
FROM "Blocks"
WHERE hash = $1
UNION ALL
SELECT b.hash, b.parent, d.depth + 1 AS depth, b.height, b."chainId"
FROM BlockAncestors d
JOIN "Blocks" b ON d.parent = b.hash
WHERE d.depth < $2
)
SELECT parent as hash, height, "chainId"
FROM BlockAncestors
ORDER BY height DESC
`;

const { rows } = await rootPgPool.query(query, [hash, CANONICAL_BASE_LINE_LENGTH]);
return rows;
}

async function fetchAndSaveBlocks(chainId: number, height: number) {
const url = `${SYNC_BASE_URL}/${NETWORK_ID}/chain/${chainId}/block?minheight=${height}&maxheight=${height}`;
const res = await fetch(url, {
method: 'GET',
headers: {
'Content-Type': 'application/json',
},
});

const data = await res.json();

const tx = await sequelize.transaction();
try {
const promises = data.items.map(async (item: any) => {
const payload = processPayload(item.payloadWithOutputs);
const block = await Block.findOne({ where: { hash: item.header.hash } });
if (block) {
return Promise.resolve();
} else {
return saveBlock({ header: item.header, payload, canonical: true }, tx);
}
});

await Promise.all(promises);
await tx.commit();
console.info(
`[INFO][SYNC][MISSING] Successfully synced blocks at height ${height} for chain ${chainId}`,
);
} catch (err) {
await tx.rollback();
throw err;
process.exit(1);
}
}
73 changes: 43 additions & 30 deletions indexer/src/services/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import Block, { BlockAttributes } from '@/models/block';
import { sequelize } from '@/config/database';
import { backfillGuards } from './guards';
import { Transaction } from 'sequelize';
import { PriceUpdaterService } from './price/price-updater.service';
import { defineCanonicalBaseline } from '@/services/define-canonical';
import { startMissingBlocksBeforeStreamingProcess } from '@/services/missing';
import {
fillChainGapsBeforeDefiningCanonicalBaseline,
startMissingBlocksBeforeStreamingProcess,
} from '@/services/missing';
import { EventAttributes } from '@/models/event';
import { startPairCreation } from '@/services/start-pair-creation';

Expand Down Expand Up @@ -53,10 +55,7 @@ export async function startStreaming() {
await startMissingBlocksBeforeStreamingProcess();

const nextBlocksToProcess: any[] = [];
const blocksRecentlyProcessed = new Set<string>();

// Initialize price updater
PriceUpdaterService.getInstance();
const initialChainGapsAlreadyFilled = new Set<number>();

// Initialize EventSource connection to the blockchain node
const eventSource = new EventSource(`${SYNC_BASE_URL}/${SYNC_NETWORK}/block/updates`);
Expand All @@ -78,27 +77,54 @@ export async function startStreaming() {
};

const processBlock = async (block: any) => {
const blockIdentifier = block.header.hash;

if (blocksRecentlyProcessed.has(blockIdentifier)) {
await defineCanonicalBaseline(
block.header.hash,
block.header.parent,
block.header.height,
block.header.chainId,
const blockHash = block.header.hash;

let blockInDatabase: Block | null = null;
try {
blockInDatabase = await Block.findOne({ where: { hash: blockHash } });
} catch (error) {
console.error(
'[ERROR][DB][STREAMING] There was an error with the database:',
blockHash,
error,
);
process.exit(1);
}

if (blockInDatabase) {
await defineCanonicalBaseline(block.header.hash);
return;
}

let tx: Transaction;
try {
tx = await sequelize.transaction();
} catch (error) {
console.error(
'[ERROR][DB][STREAMING] Failed to start transaction for new block:',
blockHash,
error,
);
return;
}

const tx = await sequelize.transaction();
try {
const payload = processPayload(block.payloadWithOutputs);

// Save the block data and process its transactions
// TODO: [CONSISTENCY] Validate saveBlock result; if null/failed, handle with rollback + DLQ + metric to avoid partial commits
await saveBlock({ header: block.header, payload, canonical: null }, tx);

if (!initialChainGapsAlreadyFilled.has(block.header.chainId)) {
initialChainGapsAlreadyFilled.add(block.header.chainId);
await fillChainGapsBeforeDefiningCanonicalBaseline({
chainId: block.header.chainId,
lastHeight: block.header.height,
tx,
});
}

await tx.commit();
blocksRecentlyProcessed.add(blockIdentifier);
} catch (error) {
await tx.rollback();
// TODO: [OBS][METRICS] Increment 'stream.block_failures' with tags { chainId, reason: 'processing' }
Expand All @@ -112,12 +138,7 @@ export async function startStreaming() {
return;
}

await defineCanonicalBaseline(
block.header.hash,
block.header.parent,
block.header.height,
block.header.chainId,
);
await defineCanonicalBaseline(block.header.hash);
};

const processBlocks = async () => {
Expand Down Expand Up @@ -146,14 +167,6 @@ export async function startStreaming() {
setTimeout(processBlocks, 1000);
};

setInterval(
() => {
blocksRecentlyProcessed.clear();
console.info('[INFO][SYNC][STREAMING] blocksRecentlyProcessed cleared');
},
1000 * 60 * 60 * 1, // 1 hour
);

processBlocks();
backfillGuards();

Expand Down