Skip to content

Commit 55e6d23

Browse files
committed
refat: added a table to track blocks that were not saved in the database.
1 parent 6ca8aca commit 55e6d23

File tree

3 files changed

+61
-12
lines changed

3 files changed

+61
-12
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Model, DataTypes, Optional } from "sequelize";
2+
import { sequelize } from "../config/database";
3+
4+
interface StreamingErrorAttributes {
5+
id: number;
6+
chainId: number;
7+
hash: string;
8+
}
9+
10+
interface StreamingErrorCreationAttributes
11+
extends Optional<StreamingErrorAttributes, "id"> {}
12+
13+
class StreamingError
14+
extends Model<StreamingErrorAttributes, StreamingErrorCreationAttributes>
15+
implements StreamingErrorAttributes
16+
{
17+
declare id: number;
18+
declare chainId: number;
19+
declare hash: string;
20+
}
21+
22+
StreamingError.init(
23+
{
24+
id: { type: DataTypes.INTEGER, autoIncrement: true, primaryKey: true },
25+
chainId: { type: DataTypes.INTEGER },
26+
hash: { type: DataTypes.STRING },
27+
},
28+
{
29+
sequelize,
30+
modelName: "StreamingError",
31+
tableName: "StreamingErrors",
32+
timestamps: true,
33+
},
34+
);
35+
36+
export default StreamingError;

indexer/src/services/sync/guards.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import pLimit from "p-limit";
22
import { rootPgPool, sequelize } from "../../config/database";
33
import { getGuardsFromBalances } from "./payload";
44
import Guard from "../../models/guard";
5-
import { delay } from "../../utils/helpers";
65

76
const CONCURRENCY_LIMIT = 4; // Number of concurrent fetches allowed
87
const limitFetch = pLimit(CONCURRENCY_LIMIT);

indexer/src/services/sync/streaming.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { dispatch, DispatchInfo } from "../../jobs/publisher-job";
55
import { uint64ToInt64 } from "../../utils/int-uint-64";
66
import Block, { BlockAttributes } from "../../models/block";
77
import { sequelize } from "../../config/database";
8+
import StreamingError from "../../models/streaming-error";
89

910
const SYNC_BASE_URL = getRequiredEnvString("SYNC_BASE_URL");
1011
const SYNC_NETWORK = getRequiredEnvString("SYNC_NETWORK");
@@ -44,26 +45,39 @@ export async function startStreaming() {
4445
blocksToProcess.push(b);
4546
}
4647

48+
console.log("Processing blocks:", blocksToProcess.length);
4749
const promises = blocksToProcess.map(async (block: any) => {
48-
try {
49-
return saveBlock(block);
50-
} catch (error) {
51-
console.error("Error saving block:", error);
50+
const blockData = await saveBlock(block);
51+
if (blockData === null) {
52+
await StreamingError.create({
53+
hash: block.header.hash,
54+
chainId: block.header.chainId,
55+
});
5256
}
57+
return blockData;
5358
});
5459

55-
const res = (await Promise.all(promises)).filter(
56-
(r) => r !== null,
60+
const processed = (await Promise.all(promises)).filter(
61+
(r) => r !== null || r !== undefined,
5762
) as DispatchInfo[];
5863

59-
const dispatches = res.map(async (r, index) => {
60-
await dispatch(r);
61-
await delay(500);
62-
console.log("Dispatched block:", index);
64+
const dispatches = processed.map(async (r) => {
65+
try {
66+
await delay(500);
67+
await dispatch(r);
68+
} catch (err) {
69+
console.error("Error dispatching block:", err);
70+
}
6371
});
6472

6573
await Promise.all(dispatches);
66-
console.log("Done processing blocks: ", blocksToProcess.length);
74+
console.log(
75+
"Processed:",
76+
processed.length,
77+
"|",
78+
"Dispatched:",
79+
dispatches.length,
80+
);
6781
}, 1000 * 10);
6882

6983
setInterval(

0 commit comments

Comments
 (0)