Skip to content

Commit 7f236dd

Browse files
authored
Merge pull request #135 from hack-a-chain-software/improvements
refat: added a table to track blocks that were not saved in the database.
2 parents 6ca8aca + c6f8299 commit 7f236dd

File tree

6 files changed

+159
-14
lines changed

6 files changed

+159
-14
lines changed

indexer/Dockerfile.development

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
FROM node:18-alpine
2+
3+
WORKDIR /app
4+
5+
COPY package.json .
6+
RUN yarn install --frozen-lockfile
7+
RUN yarn global add ts-node dotenv-cli
8+
9+
COPY . .
10+
11+
EXPOSE 3001
12+
13+
CMD ["yarn", "dev"]
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
version: '3.8'
2+
3+
services:
4+
indexer-db:
5+
image: postgres
6+
container_name: kad-indexer-postgres
7+
environment:
8+
POSTGRES_USER: ${DB_USERNAME}
9+
POSTGRES_PASSWORD: ${DB_PASSWORD}
10+
POSTGRES_DB: ${DB_NAME}
11+
ports:
12+
- "5432:5432"
13+
healthcheck:
14+
test: ["CMD-SHELL", "pg_isready -U ${DB_USERNAME}"]
15+
interval: 10s
16+
timeout: 5s
17+
retries: 5
18+
19+
db-migration:
20+
build:
21+
context: .
22+
dockerfile: Dockerfile.development
23+
container_name: db-migration
24+
environment:
25+
DB_HOST: indexer-db
26+
command: >
27+
/bin/sh -c "
28+
rm -f /app/shared/db-migration-complete &&
29+
rm -f /app/shared/graphql-running &&
30+
yarn create:database &&
31+
touch /app/shared/db-migration-complete"
32+
depends_on:
33+
indexer-db:
34+
condition: service_healthy
35+
volumes:
36+
- shared-data:/app/shared
37+
38+
graphql-app:
39+
build:
40+
context: .
41+
dockerfile: Dockerfile.development
42+
container_name: kad-indexer-graphql
43+
environment:
44+
DB_HOST: indexer-db
45+
command: >
46+
/bin/sh -c "
47+
while [ ! -f /app/shared/db-migration-complete ]; do
48+
echo 'Waiting for db-migration...';
49+
sleep 6;
50+
done &&
51+
touch /app/shared/graphql-running &&
52+
yarn dev:graphql"
53+
ports:
54+
- "3001:3001"
55+
depends_on:
56+
db-migration:
57+
condition: service_started
58+
volumes:
59+
- shared-data:/app/shared
60+
61+
streaming-app:
62+
build:
63+
context: .
64+
dockerfile: Dockerfile.development
65+
container_name: kad-indexer-streaming
66+
environment:
67+
DB_HOST: indexer-db
68+
KADENA_GRAPHQL_API_URL: http://graphql-app
69+
command: >
70+
/bin/sh -c "
71+
while [ ! -f /app/shared/graphql-running ]; do
72+
echo 'Waiting for graphql-app...';
73+
sleep 6;
74+
done &&
75+
yarn dev:streaming"
76+
depends_on:
77+
graphql-app:
78+
condition: service_started
79+
volumes:
80+
- shared-data:/app/shared
81+
82+
volumes:
83+
shared-data:

indexer/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@
6565
"graphql:generate-types": "npx graphql-codegen",
6666
"dev:database": "ts-node src/index.ts --database",
6767
"dev:streaming": "ts-node src/index.ts --streaming",
68+
"dev:graphql": "ts-node src/index.ts --graphql",
6869
"dev:old-graphql": "ts-node src/index.ts --oldGraphql",
69-
"dev:graphql": "nodemon src/index.ts --graphql",
70-
"dev:guards": "nodemon src/index.ts --guards",
70+
"dev:hot:graphql": "nodemon src/index.ts --graphql",
7171
"prod:start": "docker-compose up --build indexer && docker-compose logs -f indexer",
7272
"prod:streaming": "node dist/index.js --streaming",
7373
"prod:backfill": "node dist/index.js --backfill",
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Model, DataTypes, Optional } from "sequelize";
2+
import { sequelize } from "../config/database";
3+
4+
interface StreamingErrorAttributes {
5+
id: number;
6+
chainId: number;
7+
hash: string;
8+
}
9+
10+
interface StreamingErrorCreationAttributes
11+
extends Optional<StreamingErrorAttributes, "id"> {}
12+
13+
class StreamingError
14+
extends Model<StreamingErrorAttributes, StreamingErrorCreationAttributes>
15+
implements StreamingErrorAttributes
16+
{
17+
declare id: number;
18+
declare chainId: number;
19+
declare hash: string;
20+
}
21+
22+
StreamingError.init(
23+
{
24+
id: { type: DataTypes.INTEGER, autoIncrement: true, primaryKey: true },
25+
chainId: { type: DataTypes.INTEGER },
26+
hash: { type: DataTypes.STRING },
27+
},
28+
{
29+
sequelize,
30+
modelName: "StreamingError",
31+
tableName: "StreamingErrors",
32+
timestamps: true,
33+
},
34+
);
35+
36+
export default StreamingError;

indexer/src/services/sync/guards.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import pLimit from "p-limit";
22
import { rootPgPool, sequelize } from "../../config/database";
33
import { getGuardsFromBalances } from "./payload";
44
import Guard from "../../models/guard";
5-
import { delay } from "../../utils/helpers";
65

76
const CONCURRENCY_LIMIT = 4; // Number of concurrent fetches allowed
87
const limitFetch = pLimit(CONCURRENCY_LIMIT);

indexer/src/services/sync/streaming.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { dispatch, DispatchInfo } from "../../jobs/publisher-job";
55
import { uint64ToInt64 } from "../../utils/int-uint-64";
66
import Block, { BlockAttributes } from "../../models/block";
77
import { sequelize } from "../../config/database";
8+
import StreamingError from "../../models/streaming-error";
89

910
const SYNC_BASE_URL = getRequiredEnvString("SYNC_BASE_URL");
1011
const SYNC_NETWORK = getRequiredEnvString("SYNC_NETWORK");
@@ -44,26 +45,39 @@ export async function startStreaming() {
4445
blocksToProcess.push(b);
4546
}
4647

48+
console.log("Processing blocks:", blocksToProcess.length);
4749
const promises = blocksToProcess.map(async (block: any) => {
48-
try {
49-
return saveBlock(block);
50-
} catch (error) {
51-
console.error("Error saving block:", error);
50+
const blockData = await saveBlock(block);
51+
if (blockData === null) {
52+
await StreamingError.create({
53+
hash: block.header.hash,
54+
chainId: block.header.chainId,
55+
});
5256
}
57+
return blockData;
5358
});
5459

55-
const res = (await Promise.all(promises)).filter(
56-
(r) => r !== null,
60+
const processed = (await Promise.all(promises)).filter(
61+
(r) => r !== null || r !== undefined,
5762
) as DispatchInfo[];
5863

59-
const dispatches = res.map(async (r, index) => {
60-
await dispatch(r);
61-
await delay(500);
62-
console.log("Dispatched block:", index);
64+
const dispatches = processed.map(async (r) => {
65+
try {
66+
await delay(500);
67+
await dispatch(r);
68+
} catch (err) {
69+
console.error("Error dispatching block:", err);
70+
}
6371
});
6472

6573
await Promise.all(dispatches);
66-
console.log("Done processing blocks: ", blocksToProcess.length);
74+
console.log(
75+
"Processed:",
76+
processed.length,
77+
"|",
78+
"Dispatched:",
79+
dispatches.length,
80+
);
6781
}, 1000 * 10);
6882

6983
setInterval(

0 commit comments

Comments
 (0)