Skip to content

Commit 5f5d9c0

Browse files
authored
Wait on blockchain processing completion when polling (#156)
* wait on processing completion * increase polling frequency * log execution time less frequently * refactor to iterable polling, fix concurrency * start/stop scheduling in project lifecycle hooks
1 parent be49856 commit 5f5d9c0

2 files changed

Lines changed: 77 additions & 5 deletions

File tree

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { Logger } from "@nestjs/common";
2+
3+
const logger = new Logger("Utils");
4+
5+
export type ExecutableCallback = () => Promise<void>;
6+
7+
export type AsyncIntervalOptions = {
8+
functionToExecute: ExecutableCallback;
9+
name: string;
10+
intervalInMs: number;
11+
};
12+
13+
/**
14+
* Runs the provided function in interval.
15+
* Waits for promise completion until rerunning interval.
16+
*/
17+
export class AsyncIntervalScheduler {
18+
private isRunning: boolean;
19+
private runningTimeoutId: NodeJS.Timeout;
20+
private readonly options: AsyncIntervalOptions;
21+
22+
constructor(options: AsyncIntervalOptions) {
23+
this.options = options;
24+
this.isRunning = false;
25+
}
26+
27+
async start(): Promise<void> {
28+
if (this.isRunning) {
29+
return;
30+
}
31+
this.isRunning = true;
32+
await this.pollIfRunning();
33+
}
34+
35+
stop(): void {
36+
clearTimeout(this.runningTimeoutId);
37+
this.isRunning = false;
38+
}
39+
40+
private async pollIfRunning() {
41+
const { intervalInMs } = this.options;
42+
while (this.isRunning) {
43+
await this.executeCallback();
44+
await new Promise((resolve) => {
45+
this.runningTimeoutId = setTimeout(resolve, intervalInMs);
46+
});
47+
}
48+
}
49+
50+
private async executeCallback() {
51+
const { functionToExecute, name, intervalInMs } = this.options;
52+
try {
53+
const startTime = new Date();
54+
await functionToExecute();
55+
const endTime = new Date();
56+
const runTimeInMs = endTime.getTime() - startTime.getTime();
57+
if (runTimeInMs > intervalInMs) {
58+
logger.debug(`${name} took ${runTimeInMs}ms`);
59+
}
60+
} catch (e) {
61+
logger.error(`${name} failed`, e);
62+
}
63+
}
64+
}

backend/src/flow/services/aggregator.service.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { Injectable, Logger } from "@nestjs/common";
2-
import { Interval } from "@nestjs/schedule";
32
import {
43
FlowBlock,
54
FlowCollection,
@@ -43,6 +42,7 @@ import {
4342
} from "../../processes/managed-process.entity";
4443
import { CommonService } from "../../core/services/common.service";
4544
import { FlowEmulatorService } from "./emulator.service";
45+
import { AsyncIntervalScheduler } from "../../core/async-interval-scheduler";
4646

4747
type BlockData = {
4848
block: FlowBlock;
@@ -70,6 +70,8 @@ export class FlowAggregatorService implements ProjectContextLifecycle {
7070
private projectContext: ProjectEntity | undefined;
7171
private emulatorProcess: ManagedProcessEntity | undefined;
7272
private readonly logger = new Logger(FlowAggregatorService.name);
73+
private readonly processingIntervalMs = 500;
74+
private processingScheduler: AsyncIntervalScheduler;
7375

7476
constructor(
7577
private blockService: BlocksService,
@@ -85,9 +87,16 @@ export class FlowAggregatorService implements ProjectContextLifecycle {
8587
private configService: FlowConfigService,
8688
private processManagerService: ProcessManagerService,
8789
private commonService: CommonService
88-
) {}
90+
) {
91+
this.processingScheduler = new AsyncIntervalScheduler({
92+
name: "Blockchain processing",
93+
intervalInMs: this.processingIntervalMs,
94+
functionToExecute: this.processBlockchainData.bind(this),
95+
});
96+
}
8997

9098
onEnterProjectContext(project: ProjectEntity): void {
99+
this.processingScheduler?.start();
91100
this.projectContext = project;
92101
this.emulatorProcess = this.processManagerService.get(
93102
FlowEmulatorService.processId
@@ -103,6 +112,7 @@ export class FlowAggregatorService implements ProjectContextLifecycle {
103112
}
104113

105114
onExitProjectContext(): void {
115+
this.processingScheduler?.stop();
106116
this.projectContext = undefined;
107117
this.processManagerService.removeListener(
108118
ProcessManagerEvent.PROCESS_ADDED,
@@ -151,9 +161,7 @@ export class FlowAggregatorService implements ProjectContextLifecycle {
151161
return latestUnprocessedBlockHeight - (nextBlockHeightToProcess - 1);
152162
}
153163

154-
// TODO(milestone-x): Next interval shouldn't start before this function resolves
155-
@Interval(1000)
156-
async fetchDataFromDataSource(): Promise<void> {
164+
async processBlockchainData(): Promise<void> {
157165
if (!this.projectContext) {
158166
return;
159167
}

0 commit comments

Comments
 (0)