Skip to content

Commit 9452c36

Browse files
committed
fix: subscription
1 parent a068f12 commit 9452c36

File tree

4 files changed

+34
-54
lines changed

4 files changed

+34
-54
lines changed

indexer/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ async function main() {
3434
}
3535

3636
if (options.streaming) {
37-
await initializeDatabase();
3837
await startStreaming();
3938
} else if (options.guards) {
4039
await startGuardsBackfill();

indexer/src/jobs/publisher-job.ts

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
import zod from "zod";
22
import { getRequiredEnvString } from "../utils/helpers";
33

4-
const eventsToPublish: Array<DispatchInfo> = [];
5-
64
const KADENA_GRAPHQL_URL = getRequiredEnvString("KADENA_GRAPHQL_API_URL");
75
const KADENA_GRAPHQL_PORT = getRequiredEnvString("KADENA_GRAPHQL_API_PORT");
86

9-
const DISPATCH_INTERVAL = 1000; // 1 second
10-
117
export const dispatchInfoSchema = zod.object({
128
hash: zod.string(),
139
chainId: zod.string(),
@@ -18,20 +14,7 @@ export const dispatchInfoSchema = zod.object({
1814

1915
export type DispatchInfo = zod.infer<typeof dispatchInfoSchema>;
2016

21-
export function startPublisher() {
22-
setInterval(() => {
23-
const [first] = eventsToPublish.splice(0, 1);
24-
if (first) {
25-
dispatch(first);
26-
}
27-
}, DISPATCH_INTERVAL);
28-
}
29-
30-
export const addPublishEvents = (events: DispatchInfo[]) => {
31-
// eventsToPublish.push(...events);
32-
};
33-
34-
const dispatch = async (dispatchInfo: DispatchInfo) => {
17+
export const dispatch = async (dispatchInfo: DispatchInfo) => {
3518
try {
3619
const url = `${KADENA_GRAPHQL_URL}:${KADENA_GRAPHQL_PORT}/new-block`;
3720
const response = await fetch(url, {

indexer/src/services/sync/payload.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,21 @@ export async function processPayloadKey(
2525
block: BlockAttributes,
2626
payloadData: any,
2727
tx?: Transaction,
28-
) {
28+
): Promise<EventAttributes[]> {
2929
const transactions = payloadData.transactions || [];
3030

3131
const transactionPromises = transactions.map((transactionArray: any) =>
3232
processTransaction(transactionArray, block, tx),
3333
);
3434

35-
await Promise.all(transactionPromises);
35+
return (await Promise.all(transactionPromises)).flat();
3636
}
3737

3838
export async function processTransaction(
3939
transactionArray: any,
4040
block: BlockAttributes,
4141
tx?: Transaction,
42-
) {
42+
): Promise<EventAttributes[]> {
4343
const transactionInfo = transactionArray[TRANSACTION_INDEX];
4444
const receiptInfo = transactionArray[RECEIPT_INDEX];
4545

@@ -192,8 +192,11 @@ export async function processTransaction(
192192
});
193193

194194
await saveGuards(insertedBalances ?? [], tx);
195+
196+
return eventsAttributes;
195197
} catch (error) {
196198
console.error(`Error saving transaction to the database: ${error}`);
199+
return [];
197200
}
198201
}
199202

indexer/src/services/sync/streaming.ts

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { processPayloadKey } from "./payload";
2-
import { getDecoded, getRequiredEnvString } from "../../utils/helpers";
2+
import { delay, getDecoded, getRequiredEnvString } from "../../utils/helpers";
33
import EventSource from "eventsource";
4-
import { DispatchInfo } from "../../jobs/publisher-job";
4+
import { dispatch, DispatchInfo } from "../../jobs/publisher-job";
55
import { uint64ToInt64 } from "../../utils/int-uint-64";
66
import Block, { BlockAttributes } from "../../models/block";
77
import { sequelize } from "../../config/database";
@@ -46,13 +46,23 @@ export async function startStreaming() {
4646

4747
const promises = blocksToProcess.map(async (block: any) => {
4848
try {
49-
await saveBlock(block);
49+
return saveBlock(block);
5050
} catch (error) {
5151
console.error("Error saving block:", error);
5252
}
5353
});
5454

55-
await Promise.all(promises);
55+
const res = (await Promise.all(promises)).filter(
56+
(r) => r !== null,
57+
) as DispatchInfo[];
58+
59+
const dispatches = res.map(async (r, index) => {
60+
await dispatch(r);
61+
await delay(500);
62+
console.log("Dispatched block:", index);
63+
});
64+
65+
await Promise.all(dispatches);
5666
console.log("Done processing blocks: ", blocksToProcess.length);
5767
}, 1000 * 10);
5868

@@ -122,42 +132,27 @@ async function saveBlock(parsedData: any): Promise<DispatchInfo | null> {
122132
transaction: tx,
123133
});
124134

125-
await processPayloadKey(createdBlock, payloadData, tx);
126-
127-
const txs: Array<{
128-
requestKey: string;
129-
qualifiedEventNames: string[];
130-
}> = transactions.map((t: any) => {
131-
const qualifiedEventNames = t[1].events.map((e: any) => {
132-
const module = e.module.namespace
133-
? `${e.module.namespace}.${e.module.name}`
134-
: e.module.name;
135-
const name = e.name;
136-
return `${module}.${name}`;
137-
});
138-
return {
139-
requestKey: t[1].reqKey,
140-
qualifiedEventNames,
141-
};
142-
});
135+
const eventsCreated = await processPayloadKey(
136+
createdBlock,
137+
payloadData,
138+
tx,
139+
);
140+
141+
const uniqueRequestKeys = new Set(
142+
eventsCreated.map((t) => t.requestkey).filter(Boolean),
143+
);
143144

144-
const events = transactions.flatMap((t: any) => t.qualifiedEventNames);
145-
const qualifiedEventNamesSet = new Set();
146-
const uniqueQualifiedEventNames = events.filter(
147-
(qualifiedEventName: any) => {
148-
const isDuplicate = qualifiedEventNamesSet.has(qualifiedEventName);
149-
qualifiedEventNamesSet.add(qualifiedEventName);
150-
return !isDuplicate;
151-
},
145+
const uniqueQualifiedEventNames = new Set(
146+
eventsCreated.map((t) => `${t.module}.${t.name}`).filter(Boolean),
152147
);
153148

154149
await tx.commit();
155150
return {
156151
hash: createdBlock.hash,
157152
chainId: createdBlock.chainId.toString(),
158153
height: createdBlock.height,
159-
requestKeys: txs.map((t) => t.requestKey),
160-
qualifiedEventNames: uniqueQualifiedEventNames,
154+
requestKeys: Array.from(uniqueRequestKeys),
155+
qualifiedEventNames: Array.from(uniqueQualifiedEventNames),
161156
};
162157
} catch (error) {
163158
await tx.rollback();

0 commit comments

Comments
 (0)