Skip to content

Commit 504cbef

Browse files
committed
refactor: improved the orphan blocks mechanism to make sure the canonical line is consistent every time a new block is added
1 parent 16fd1e6 commit 504cbef

File tree

6 files changed

+233
-301
lines changed

6 files changed

+233
-301
lines changed

indexer/src/kadena-server/repository/application/block-repository.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Transaction } from 'sequelize';
12
import { Block, FungibleChainAccount, InputMaybe, PageInfo } from '../../config/graphql-types';
23
import { PaginationsParams } from '../pagination';
34
import { ConnectionEdge } from '../types';
@@ -32,10 +33,6 @@ export interface GetLatestBlocksParams {
3233
quantity: number;
3334
}
3435

35-
export interface UpdateCanonicalStatusParams {
36-
blocks: { hash: string; canonical: boolean }[];
37-
}
38-
3936
export type BlockOutput = Omit<
4037
Block,
4138
'parent' | 'events' | 'minerAccount' | 'transactions' | 'canonical'
@@ -91,9 +88,16 @@ export default interface BlockRepository {
9188
id?: string,
9289
): Promise<BlockOutput[]>;
9390

94-
getBlocksWithSameHeight(height: number, chainId: string): Promise<BlockOutput[]>;
95-
getBlocksWithHeightHigherThan(height: number, chainId: string): Promise<BlockOutput[]>;
96-
updateCanonicalStatus(params: UpdateCanonicalStatusParams): Promise<void>;
91+
getBlocksWithSameHeight(
92+
height: number,
93+
chainId: string,
94+
tx?: Transaction,
95+
): Promise<BlockOutput[]>;
96+
getBlocksWithHeightHigherThan(
97+
height: number,
98+
chainId: string,
99+
tx?: Transaction,
100+
): Promise<BlockOutput[]>;
97101

98102
// dataloader
99103
getBlocksByEventIds(eventIds: string[]): Promise<BlockOutput[]>;

indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts

Lines changed: 27 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* - Optimized batch retrieval through DataLoader patterns
1515
*/
1616

17-
import { Op, QueryTypes } from 'sequelize';
17+
import { Op, QueryTypes, Transaction } from 'sequelize';
1818
import { rootPgPool, sequelize } from '../../../../config/database';
1919
import BlockModel from '../../../../models/block';
2020
import BlockRepository, {
@@ -24,7 +24,6 @@ import BlockRepository, {
2424
GetCompletedBlocksParams,
2525
GetLatestBlocksParams,
2626
GetTotalCountParams,
27-
UpdateCanonicalStatusParams,
2827
} from '../../application/block-repository';
2928
import { getPageInfo, getPaginationParams } from '../../pagination';
3029
import { blockValidator } from '../schema-validator/block-schema-validator';
@@ -896,61 +895,35 @@ export default class BlockDbRepository implements BlockRepository {
896895
return Object.assign({}, ...maxHeightsArray);
897896
}
898897

899-
async getBlocksWithSameHeight(height: number, chainId: string): Promise<BlockOutput[]> {
900-
const query = `
901-
SELECT b.*
902-
FROM "Blocks" b
903-
WHERE b."height" = $1 AND b."chainId" = $2
904-
`;
905-
906-
const { rows } = await rootPgPool.query(query, [height, chainId]);
907-
908-
return rows.map(row => blockValidator.validate(row));
909-
}
910-
911-
async getBlocksWithHeightHigherThan(height: number, chainId: string): Promise<BlockOutput[]> {
912-
const query = `
913-
SELECT b.*
914-
FROM "Blocks" b
915-
WHERE b.height > $1 AND b."chainId" = $2;
916-
`;
917-
918-
const { rows } = await rootPgPool.query(query, [height, chainId]);
898+
async getBlocksWithSameHeight(
899+
height: number,
900+
chainId: string,
901+
tx?: Transaction,
902+
): Promise<BlockOutput[]> {
903+
const blocks = await BlockModel.findAll({
904+
where: {
905+
height,
906+
chainId,
907+
},
908+
transaction: tx,
909+
});
919910

920-
return rows.map(row => blockValidator.validate(row));
911+
return blocks.map(row => blockValidator.mapFromSequelize(row));
921912
}
922913

923-
async updateCanonicalStatus(params: UpdateCanonicalStatusParams) {
924-
const canonicalHashes = params.blocks
925-
.filter(change => change.canonical)
926-
.map(change => change.hash);
927-
const nonCanonicalHashes = params.blocks
928-
.filter(change => !change.canonical)
929-
.map(change => change.hash);
930-
931-
await rootPgPool.query('BEGIN');
932-
try {
933-
if (canonicalHashes.length > 0) {
934-
const canonicalQuery = `
935-
UPDATE "Blocks"
936-
SET "canonical" = true
937-
WHERE hash = ANY($1)
938-
`;
939-
await rootPgPool.query(canonicalQuery, [canonicalHashes]);
940-
}
914+
async getBlocksWithHeightHigherThan(
915+
height: number,
916+
chainId: string,
917+
tx?: Transaction,
918+
): Promise<BlockOutput[]> {
919+
const blocks = await BlockModel.findAll({
920+
where: {
921+
height: { [Op.gt]: height },
922+
chainId,
923+
},
924+
transaction: tx,
925+
});
941926

942-
if (nonCanonicalHashes.length > 0) {
943-
const nonCanonicalQuery = `
944-
UPDATE "Blocks"
945-
SET "canonical" = false
946-
WHERE hash = ANY($1)
947-
`;
948-
await rootPgPool.query(nonCanonicalQuery, [nonCanonicalHashes]);
949-
}
950-
await rootPgPool.query('COMMIT');
951-
} catch (error) {
952-
await rootPgPool.query('ROLLBACK');
953-
throw error;
954-
}
927+
return blocks.map(row => blockValidator.mapFromSequelize(row));
955928
}
956929
}

indexer/src/services/define-canonical.ts

Lines changed: 126 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,58 @@
11
import BlockDbRepository from '@/kadena-server/repository/infra/repository/block-db-repository';
22
import { increaseCounters } from '@/services/counters';
33
import { markCanonicalTip } from '@/utils/canonical-tip';
4+
import { getRequiredEnvString } from '@/utils/helpers';
5+
import { processPayload, saveBlock } from './streaming';
6+
import { sequelize } from '@/config/database';
7+
import { Transaction } from 'sequelize';
48

59
const blockRepository = new BlockDbRepository();
10+
const SYNC_BASE_URL = getRequiredEnvString('SYNC_BASE_URL');
11+
const SYNC_NETWORK = getRequiredEnvString('SYNC_NETWORK');
12+
13+
export async function defineCanonicalBaseline(
14+
blockHash: string,
15+
parentHash: string,
16+
height: number,
17+
chainId: number,
18+
) {
19+
try {
20+
const parentBlock = await blockRepository.getBlockByHash(parentHash);
21+
if (!parentBlock) {
22+
console.log(`[INFO][SYNC][DEFINE_CANONICAL] parentBlock not found. filling gaps ...`);
23+
console.log(`[chainId: ${chainId}, parentHash: ${parentHash}, height: ${height - 1}]`);
24+
await fillChainGapAndConfirmBlockPath(parentHash, height - 1, chainId);
25+
}
26+
} catch (error) {
27+
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Error filling gaps:`, error);
28+
return;
29+
}
630

7-
export async function defineCanonicalInStreaming(blockHash: string) {
831
const tipBlock = await blockRepository.getBlockByHash(blockHash);
932
if (!tipBlock) {
10-
console.error('[ERROR][DATA][DATA_CORRUPT] Error defining canonical in streaming:', blockHash);
11-
return;
33+
// this scenario should not happen, but if it does, terminate the app.
34+
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Block ${blockHash} not found in database`);
35+
process.exit(1);
36+
}
37+
38+
let tx: Transaction;
39+
try {
40+
tx = await sequelize.transaction();
41+
} catch (error) {
42+
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Failed to start transaction:`, error);
43+
throw error;
1244
}
1345

1446
try {
1547
const blocksWithSameHeightOfTipBlock = await blockRepository.getBlocksWithSameHeight(
1648
tipBlock.height,
1749
tipBlock.chainId,
50+
tx,
1851
);
1952
const blocksWithHigherHeightOfTipBlock = await blockRepository.getBlocksWithHeightHigherThan(
2053
tipBlock.height,
2154
tipBlock.chainId,
55+
tx,
2256
);
2357
const {
2458
blocksBecameCanonical,
@@ -30,6 +64,7 @@ export async function defineCanonicalInStreaming(blockHash: string) {
3064
blocksWithSameHeightOfTipBlock,
3165
blocksWithHigherHeightOfTipBlock,
3266
tipBlock,
67+
tx,
3368
});
3469

3570
await increaseCounters({
@@ -38,8 +73,95 @@ export async function defineCanonicalInStreaming(blockHash: string) {
3873
canonicalTransactionsCount: transactionsBecameCanonical - transactionsBecameNonCanonical,
3974
orphanTransactionsCount: transactionsBecameNonCanonical - transactionsBecameCanonical,
4075
chainId: tipBlock.chainId,
76+
tx,
4177
});
78+
await tx.commit();
79+
} catch (error) {
80+
await tx.rollback();
81+
console.error(`[ERROR][SYNC][DEFINE_CANONICAL] Error defining canonical:`, error);
82+
}
83+
}
84+
85+
async function fillChainGapAndConfirmBlockPath(blockHash: string, height: number, chainId: number) {
86+
let blocksByHash: Record<string, any>;
87+
try {
88+
blocksByHash = await fetchBlocksFromChainwebNode(chainId, height);
4289
} catch (error) {
43-
console.error('Error defining canonical:', error);
90+
console.error(`[ERROR][SYNC][FILL_GAPS] Failed to get blocks to fill gaps:`, error);
91+
throw error;
4492
}
93+
94+
let tx: Transaction;
95+
try {
96+
tx = await sequelize.transaction();
97+
} catch (error) {
98+
console.error(`[ERROR][SYNC][FILL_GAPS] Failed to start transaction to fill gaps:`, error);
99+
throw error;
100+
}
101+
102+
let currentHash = blockHash;
103+
try {
104+
while (true) {
105+
const existingBlock = await blockRepository.getBlockByHash(currentHash);
106+
if (existingBlock) {
107+
console.info(
108+
`[INFO][SYNC][FILL_GAPS] Found existing block: ${currentHash}, stopping gap fill`,
109+
);
110+
break;
111+
}
112+
113+
const currentBlockAPIData = blocksByHash[currentHash];
114+
if (!currentBlockAPIData) {
115+
console.info(`[INFO][SYNC][FILL_GAPS] API data all filled, stopping gap fill`);
116+
break;
117+
}
118+
119+
const payload = processPayload(currentBlockAPIData.payloadWithOutputs);
120+
await saveBlock({ header: currentBlockAPIData.header, payload, canonical: true }, tx);
121+
122+
// Move to the parent block
123+
currentHash = currentBlockAPIData.header.parent;
124+
}
125+
await tx.commit();
126+
} catch (err) {
127+
console.error(`[ERROR][SYNC][FILL_GAPS] Failed to save block ${currentHash} in:`, err);
128+
await tx.rollback();
129+
throw err;
130+
}
131+
}
132+
133+
async function fetchBlocksFromChainwebNode(
134+
chainId: number,
135+
height: number,
136+
): Promise<Record<string, any>> {
137+
const cut = await fetch(`${SYNC_BASE_URL}/${SYNC_NETWORK}/cut`, {
138+
method: 'GET',
139+
headers: {
140+
'Content-Type': 'application/json',
141+
},
142+
});
143+
const cutData = await cut.json();
144+
145+
const upperHash = cutData.hashes[chainId].hash;
146+
const MIN_HEIGHT = height - 10; // 10 blocks is the max gap we can fill
147+
const url = `${SYNC_BASE_URL}/${SYNC_NETWORK}/chain/${chainId}/block/branch?minheight=${MIN_HEIGHT}&maxheight=${height}`;
148+
const res = await fetch(url, {
149+
method: 'POST',
150+
headers: {
151+
'Content-Type': 'application/json',
152+
},
153+
body: JSON.stringify({
154+
upper: [upperHash],
155+
}),
156+
});
157+
158+
const data = await res.json();
159+
160+
// Create a map of blocks by hash for easy lookup
161+
const blocksByHash = data.items.reduce((acc: Record<string, any>, item: any) => {
162+
acc[item.header.hash] = item;
163+
return acc;
164+
}, {});
165+
166+
return blocksByHash;
45167
}

0 commit comments

Comments
 (0)