Skip to content

Commit 7cdcf27

Browse files
committed
refat: subscription with polling
1 parent ec7cb27 commit 7cdcf27

File tree

9 files changed

+233
-148
lines changed

9 files changed

+233
-148
lines changed

indexer/src/kadena-server/config/apollo-server-config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ export type ResolverContext = {
4040
mempoolGateway: MempoolGateway;
4141
pactGateway: PactGateway;
4242
pubSub: PubSub;
43+
signal: AbortSignal;
4344
};
4445

4546
export const createGraphqlContext = () => {
@@ -56,6 +57,7 @@ export const createGraphqlContext = () => {
5657
mempoolGateway: new MempoolApiGateway(),
5758
pactGateway: new PactApiGateway(),
5859
pubSub: publishSubscribe,
60+
signal: new AbortController().signal,
5961
};
6062

6163
return Promise.resolve({

indexer/src/kadena-server/repository/application/block-repository.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ export interface GetBlocksBetweenHeightsParams extends PaginationsParams {
2525
endHeight?: InputMaybe<number>;
2626
}
2727

28+
export interface GetLatestBlocksParams {
29+
creationTime: number;
30+
lastBlockId?: number;
31+
chainIds?: string[];
32+
}
33+
2834
export type BlockOutput = Omit<
2935
Block,
3036
"parent" | "events" | "minerAccount" | "transactions"
@@ -62,6 +68,8 @@ export default interface BlockRepository {
6268

6369
getTotalCountOfBlockEvents(blockHash: string): Promise<number>;
6470

71+
getLatestBlocks(params: GetLatestBlocksParams): Promise<BlockOutput[]>;
72+
6573
getTransactionsOrderedByBlockDepth(
6674
transactions: TransactionOutput[],
6775
): Promise<TransactionOutput[]>;

indexer/src/kadena-server/repository/application/event-repository.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ export interface GetEventParams {
3535
requestKey: string;
3636
}
3737

38+
export interface GetLastEventsParams {
39+
qualifiedEventName: string;
40+
lastEventId?: number;
41+
chainId?: string | null;
42+
minimumDepth?: number | null;
43+
}
44+
3845
export default interface EventRepository {
3946
getEvent(params: GetEventParams): Promise<EventOutput>;
4047
getBlockEvents(params: GetBlockEventsParams): Promise<{
@@ -51,7 +58,9 @@ export default interface EventRepository {
5158
}>;
5259
getTotalEventsCount(hash: GetTotalEventsCount): Promise<number>;
5360
getTotalTransactionEventsCount(
54-
hash: GetTotalTransactionEventsCount
61+
hash: GetTotalTransactionEventsCount,
5562
): Promise<number>;
5663
getTotalCountOfBlockEvents(hash: string): Promise<number>;
64+
getLastEventId(): Promise<number>;
65+
getLastEvents(params: GetLastEventsParams): Promise<EventOutput[]>;
5766
}

indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import BlockRepository, {
66
GetBlocksBetweenHeightsParams,
77
GetBlocksFromDepthParams,
88
GetCompletedBlocksParams,
9+
GetLatestBlocksParams,
910
} from "../../application/block-repository";
1011
import { getPageInfo, getPaginationParams } from "../../pagination";
1112
import { blockValidator } from "../schema-validator/block-schema-validator";
@@ -87,23 +88,55 @@ export default class BlockDbRepository implements BlockRepository {
8788
last,
8889
});
8990

90-
const query: FindOptions<BlockAttributes> = {
91-
where: {
92-
height: { [Op.gte]: startHeight },
93-
...(endHeight && { height: { [Op.lte]: endHeight } }),
94-
...(after && { id: { [Op.lt]: after } }),
95-
...(before && { id: { [Op.gt]: before } }),
96-
...(!!chainIds?.length && { chainId: { [Op.in]: chainIds } }),
97-
},
98-
limit,
99-
order: [["id", order]],
100-
};
91+
const queryParams: (string | number | string[])[] = [limit, startHeight];
92+
let conditions = "";
10193

102-
const rows = await BlockModel.findAll(query);
94+
if (before) {
95+
queryParams.push(before);
96+
conditions += `\nAND b.id > $${queryParams.length}`;
97+
}
10398

104-
const edges = rows.map((row) => ({
99+
if (after) {
100+
queryParams.push(after);
101+
conditions += `\nAND b.id < $${queryParams.length}`;
102+
}
103+
104+
if (chainIds?.length) {
105+
queryParams.push(chainIds);
106+
conditions += `\nAND b."chainId" = $${queryParams.length}`;
107+
}
108+
109+
if (endHeight) {
110+
queryParams.push(endHeight);
111+
conditions += `\nAND b."height" <= $${queryParams.length}`;
112+
}
113+
114+
const query = `
115+
SELECT b.id,
116+
b.hash,
117+
b."chainId",
118+
b."creationTime",
119+
b."epochStart",
120+
b."featureFlags",
121+
b.height as "height",
122+
b.nonce as "nonce",
123+
b."payloadHash" as "payloadHash",
124+
b.weight as "weight",
125+
b.target as "target",
126+
b.adjacents as "adjacents",
127+
b.parent as "parent"
128+
FROM "Blocks" b
129+
WHERE b.height >= $2
130+
${conditions}
131+
ORDER BY b.id ${order}
132+
LIMIT $1;
133+
`;
134+
135+
const { rows: blockRows } = await rootPgPool.query(query, queryParams);
136+
137+
const edges = blockRows.map((row) => ({
105138
cursor: row.id.toString(),
106-
node: blockValidator.mapFromSequelize(row),
139+
node: blockValidator.validate(row),
107140
}));
108141

109142
const pageInfo = getPageInfo({ edges, order, limit, after, before });
@@ -423,6 +456,22 @@ export default class BlockDbRepository implements BlockRepository {
423456
return block?.transactionsCount || 0;
424457
}
425458

459+
async getLatestBlocks(params: GetLatestBlocksParams): Promise<BlockOutput[]> {
460+
const { creationTime, lastBlockId, chainIds = [] } = params;
461+
const blocks = await BlockModel.findAll({
462+
where: {
463+
...(lastBlockId && { id: { [Op.gt]: lastBlockId } }),
464+
creationTime: { [Op.gt]: creationTime },
465+
...(chainIds.length && { chainId: { [Op.in]: chainIds } }),
466+
},
467+
limit: 100,
468+
order: [["id", "DESC"]],
469+
});
470+
471+
const output = blocks.map((b) => blockValidator.mapFromSequelize(b));
472+
return output;
473+
}
474+
426475
async getTransactionsOrderedByBlockDepth(
427476
transactions: TransactionOutput[],
428477
): Promise<TransactionOutput[]> {

indexer/src/kadena-server/repository/infra/repository/event-db-repository.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import EventRepository, {
55
GetBlockEventsParams,
66
GetEventParams,
77
GetEventsParams,
8+
GetLastEventsParams,
89
GetTotalEventsCount,
910
GetTotalTransactionEventsCount,
1011
GetTransactionEventsParams,
@@ -370,4 +371,67 @@ export default class EventDbRepository implements EventRepository {
370371
const totalCount = parseInt(countResult[0].count, 10);
371372
return totalCount;
372373
}
374+
375+
async getLastEventId(): Promise<number> {
376+
const query = `SELECT last_value AS lastValue from "Events_id_seq"`;
377+
const { rows } = await rootPgPool.query(query);
378+
const totalCount = parseInt(rows[0].lastValue, 10);
379+
return totalCount;
380+
}
381+
382+
async getLastEvents({
383+
qualifiedEventName,
384+
lastEventId,
385+
chainId,
386+
minimumDepth,
387+
}: GetLastEventsParams) {
388+
const queryParams = [];
389+
let conditions = "";
390+
let limitCondition = lastEventId ? "LIMIT 5" : "LIMIT 100";
391+
392+
const splitted = qualifiedEventName.split(".");
393+
const name = splitted.pop() ?? "";
394+
const module = splitted.join(".");
395+
396+
queryParams.push(module);
397+
conditions += `WHERE e.module = $${queryParams.length}`;
398+
queryParams.push(name);
399+
conditions += `\nAND e.name = $${queryParams.length}`;
400+
401+
if (lastEventId) {
402+
queryParams.push(lastEventId);
403+
conditions += `\nAND e.id > $${queryParams.length}`;
404+
}
405+
406+
if (chainId) {
407+
queryParams.push(parseInt(chainId));
408+
conditions += `\nAND b."chainId" = $${queryParams.length}`;
409+
}
410+
411+
const query = `
412+
SELECT e.id as id,
413+
e.requestkey as "requestKey",
414+
b."chainId" as "chainId",
415+
b.height as height,
416+
e.module as "moduleName",
417+
e."orderIndex" as "orderIndex",
418+
e.name as name,
419+
e.params as parameters,
420+
b.hash as "blockHash"
421+
FROM "Events" e
422+
JOIN "Transactions" t ON e."transactionId" = t.id
423+
JOIN "Blocks" b ON b.id = t."blockId"
424+
${conditions}
425+
ORDER BY e.id DESC
426+
${limitCondition}
427+
`;
428+
429+
const { rows } = await rootPgPool.query(query, queryParams);
430+
431+
const events = rows
432+
.map((e) => eventValidator.validate(e))
433+
.sort((a, b) => Number(b.id) - Number(a.id));
434+
435+
return events;
436+
}
373437
}
Lines changed: 30 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,41 @@
1-
import { withFilter } from "graphql-subscriptions";
21
import { ResolverContext } from "../../config/apollo-server-config";
32
import { SubscriptionResolvers } from "../../config/graphql-types";
4-
import { eventsQueryResolver } from "../query/events-query-resolver";
5-
import zod from "zod";
3+
import { EventOutput } from "../../repository/application/event-repository";
4+
import { buildEventOutput } from "../output/build-event-output";
65

7-
import { EVENTS_EVENT } from "./consts";
6+
async function* iteratorFn(
7+
context: ResolverContext,
8+
qualifiedEventName: string,
9+
chainId?: string | null,
10+
minimumDepth?: number | null,
11+
): AsyncGenerator<EventOutput[] | undefined, void, unknown> {
12+
let lastEventId = await context.eventRepository.getLastEventId();
13+
while (context.signal) {
14+
const newEvents = await context.eventRepository.getLastEvents({
15+
qualifiedEventName,
16+
lastEventId,
17+
chainId,
18+
minimumDepth,
19+
});
820

9-
const eventsSubscriptionSchema = zod.object({
10-
chainId: zod.string(),
11-
height: zod.number(),
12-
qualifiedEventName: zod.string(),
13-
});
21+
if (newEvents.length > 1) {
22+
lastEventId = Number(newEvents[0].id);
23+
yield newEvents.map((e) => buildEventOutput(e));
24+
}
25+
26+
await new Promise((resolve) => setTimeout(resolve, 1000));
27+
}
28+
}
1429

1530
export const eventsSubscriptionResolver: SubscriptionResolvers<ResolverContext>["events"] =
1631
{
17-
resolve: async (payload: any, _args: any, context: ResolverContext) => {
18-
const res = await (eventsQueryResolver as any)(
19-
{},
20-
{
21-
blockHash: payload.hash,
22-
chainId: payload.chainId,
23-
qualifiedEventName: payload.qualifiedEventName,
24-
},
32+
subscribe: (__root, args, context) => {
33+
return iteratorFn(
2534
context,
35+
args.qualifiedEventName,
36+
args.chainId,
37+
args.minimumDepth,
2638
);
27-
return res.edges.map((e: any) => e.node);
28-
},
29-
subscribe: (_parent, args, context) => {
30-
return {
31-
[Symbol.asyncIterator]: withFilter(
32-
() => context.pubSub.asyncIterator(EVENTS_EVENT),
33-
(payload) => {
34-
const res = eventsSubscriptionSchema.safeParse(payload);
35-
if (!res.success) {
36-
console.info("Invalid payload on eventsSubscription", payload);
37-
return false;
38-
}
39-
const { chainId, height, qualifiedEventName } = res.data;
40-
41-
if (args.chainId && chainId !== args.chainId) {
42-
return false;
43-
}
44-
45-
if (args.minimumDepth && height < args.minimumDepth) {
46-
return false;
47-
}
48-
49-
return qualifiedEventName === args.qualifiedEventName;
50-
},
51-
),
52-
};
5339
},
40+
resolve: (payload: any) => payload,
5441
};

0 commit comments

Comments
 (0)