Skip to content

Commit 01c265a

Browse files
Merge pull request #118 from blockful/refactor/retry-logic
Refactor/retry logic
2 parents d5fbfc7 + 487bab1 commit 01c265a

13 files changed

Lines changed: 759 additions & 219 deletions

File tree

apps/integrated-tests/jest.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ export default {
1010
forceExit: true,
1111
silent: true,
1212
verbose: true,
13-
};
13+
};

apps/logic-system/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"@typescript-eslint/parser": "^5.57.1",
2828
"eslint": "^8.37.0",
2929
"jest": "^29.5.0",
30+
"jest-mock": "^29.7.0",
3031
"ts-jest": "^29.1.0",
3132
"ts-node": "10.9.1",
3233
"typescript": "^4.9.5"

apps/logic-system/src/triggers/base-trigger.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ export abstract class Trigger<TData, TFilterOptions = void> {
2424
*/
2525
protected options?: TFilterOptions;
2626

27+
/**
28+
* Counter for consecutive failures
29+
* @private
30+
*/
31+
private consecutiveFailures = 0;
32+
33+
/**
34+
* Maximum consecutive failures before stopping the trigger
35+
* @private
36+
*/
37+
private readonly maxConsecutiveFailures = 5;
38+
2739
constructor(id: string, interval: number) {
2840
this.id = id;
2941
this.interval = interval;
@@ -47,7 +59,7 @@ export abstract class Trigger<TData, TFilterOptions = void> {
4759
/**
4860
* Starts the trigger to run at the specified interval
4961
* @param options Options for filtering data
50-
* @throws {Error} If there's an error during trigger execution
62+
* @throws {Error} If maximum consecutive failures are reached
5163
*/
5264
start(options: TFilterOptions): void {
5365
if (this.timer) {
@@ -60,9 +72,9 @@ export abstract class Trigger<TData, TFilterOptions = void> {
6072
try {
6173
const data = await this.fetchData(options);
6274
await this.process(data, this.options);
75+
if (this.consecutiveFailures > 0) this.resetConsecutiveFailures();
6376
} catch (error) {
64-
await this.stop();
65-
throw new Error(`Error in trigger execution (${this.id}): ${error instanceof Error ? error.message : 'Unknown error'}`);
77+
await this.handleError(error);
6678
}
6779
}, this.interval);
6880
}
@@ -76,4 +88,27 @@ export abstract class Trigger<TData, TFilterOptions = void> {
7688
this.timer = null;
7789
}
7890
}
91+
92+
/**
93+
* Handles errors from the trigger.
94+
* If the error is not recoverable, the trigger will stop.
95+
* If the error is recoverable, the trigger will retry on the next interval.
96+
* @param error Error object
97+
*/
98+
private async handleError(error: unknown): Promise<void> {
99+
this.consecutiveFailures++;
100+
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
101+
console.error(`[Trigger ${this.id}] Stopped after ${this.consecutiveFailures} consecutive failures. Last error: ${error instanceof Error ? error.message : 'Unknown error'}`);
102+
await this.stop();
103+
return;
104+
}
105+
console.log(`[Trigger ${this.id}] Will retry on next interval. Failures: ${this.consecutiveFailures}/${this.maxConsecutiveFailures}`);
106+
}
107+
108+
/**
109+
* Resets the consecutive failures counter.
110+
*/
111+
private resetConsecutiveFailures(): void {
112+
this.consecutiveFailures = 0;
113+
}
79114
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import { describe, it, expect, jest, beforeEach, afterEach } from '@jest/globals';
2+
import type { MockedFunction } from 'jest-mock';
3+
import { Trigger } from '../src/triggers/base-trigger';
4+
5+
// Mock implementation of Trigger for testing
6+
class MockTrigger extends Trigger<any> {
7+
fetchData: MockedFunction<() => Promise<any[]>>;
8+
process: MockedFunction<() => Promise<void>>;
9+
10+
constructor(id: string = 'test-trigger', interval: number = 1000) {
11+
super(id, interval);
12+
this.fetchData = jest.fn<() => Promise<any[]>>();
13+
this.process = jest.fn<() => Promise<void>>();
14+
}
15+
16+
// Expose private methods for testing
17+
getConsecutiveFailures() {
18+
return (this as any).consecutiveFailures;
19+
}
20+
}
21+
22+
describe('BaseTrigger - Retry Logic', () => {
23+
let trigger: MockTrigger;
24+
25+
beforeEach(() => {
26+
trigger = new MockTrigger('test-trigger', 100);
27+
jest.useFakeTimers();
28+
});
29+
30+
afterEach(() => {
31+
trigger.stop();
32+
jest.clearAllTimers();
33+
jest.useRealTimers();
34+
jest.clearAllMocks();
35+
});
36+
37+
describe('Retry with consecutive failures', () => {
38+
it('should continue running after 3 consecutive failures', async () => {
39+
trigger.fetchData.mockRejectedValue(new Error('Test error'));
40+
41+
trigger.start();
42+
43+
// Simulate 3 failures
44+
for (let i = 1; i <= 3; i++) {
45+
jest.advanceTimersByTime(100);
46+
await Promise.resolve();
47+
}
48+
49+
expect(trigger.fetchData).toHaveBeenCalledTimes(3);
50+
expect(trigger.getConsecutiveFailures()).toBe(3);
51+
52+
// Trigger should still be running (timer not null)
53+
expect((trigger as any).timer).not.toBeNull();
54+
});
55+
});
56+
57+
describe('Reset counter after success', () => {
58+
it('should reset consecutive failures counter after successful execution', async () => {
59+
const mockData = [{ id: 1 }, { id: 2 }];
60+
61+
// First 2 calls fail, third succeeds
62+
trigger.fetchData
63+
.mockRejectedValueOnce(new Error('Error 1'))
64+
.mockRejectedValueOnce(new Error('Error 2'))
65+
.mockResolvedValueOnce(mockData);
66+
67+
trigger.process.mockResolvedValue(undefined);
68+
69+
trigger.start();
70+
71+
// First failure
72+
jest.advanceTimersByTime(100);
73+
await Promise.resolve(); //fetchData
74+
await Promise.resolve(); //process
75+
expect(trigger.getConsecutiveFailures()).toBe(1);
76+
77+
// Second failure
78+
jest.advanceTimersByTime(100);
79+
await Promise.resolve(); //fetchData
80+
await Promise.resolve(); //process
81+
expect(trigger.getConsecutiveFailures()).toBe(2);
82+
83+
// Third attempt succeeds
84+
jest.advanceTimersByTime(100);
85+
await Promise.resolve(); //fetchData
86+
await Promise.resolve(); //process
87+
88+
expect(trigger.fetchData).toHaveBeenCalledTimes(3);
89+
expect(trigger.process).toHaveBeenCalledWith(mockData, undefined);
90+
expect(trigger.getConsecutiveFailures()).toBe(0);
91+
});
92+
});
93+
94+
describe('Stop after 5 consecutive failures', () => {
95+
it('should stop the trigger after 5 consecutive failures', async () => {
96+
trigger.fetchData.mockRejectedValue(new Error('Persistent error'));
97+
98+
trigger.start();
99+
100+
// Simulate 5 failures
101+
for (let i = 1; i <= 5; i++) {
102+
jest.advanceTimersByTime(100);
103+
await Promise.resolve(); //fetchData
104+
await Promise.resolve(); //process
105+
}
106+
107+
expect(trigger.fetchData).toHaveBeenCalledTimes(5);
108+
expect(trigger.getConsecutiveFailures()).toBe(5);
109+
110+
// Timer should be null (trigger stopped)
111+
expect((trigger as any).timer).toBeNull();
112+
113+
// Advancing time should not trigger more calls
114+
jest.advanceTimersByTime(100);
115+
await Promise.resolve();
116+
expect(trigger.fetchData).toHaveBeenCalledTimes(5); // Still 5, no new calls
117+
});
118+
});
119+
120+
describe('Timer cleanup', () => {
121+
it('should properly clean up timer when stop() is called', async () => {
122+
trigger.fetchData.mockResolvedValue([]);
123+
trigger.process.mockResolvedValue(undefined);
124+
125+
trigger.start();
126+
expect((trigger as any).timer).not.toBeNull();
127+
128+
await trigger.stop();
129+
130+
// Should not make any calls after stop
131+
jest.advanceTimersByTime(100);
132+
await Promise.resolve(); //fetchData
133+
await Promise.resolve(); //process
134+
expect(trigger.fetchData).not.toHaveBeenCalled();
135+
});
136+
});
137+
138+
describe('Error handling in process method', () => {
139+
it('should handle errors from process() method and increment failure counter', async () => {
140+
const mockData = [{ id: 1 }];
141+
trigger.fetchData.mockResolvedValue(mockData);
142+
trigger.process.mockRejectedValue(new Error('Process error'));
143+
144+
trigger.start();
145+
146+
jest.advanceTimersByTime(100);
147+
await Promise.resolve(); //fetchData
148+
await Promise.resolve(); //process
149+
150+
expect(trigger.fetchData).toHaveBeenCalled();
151+
expect(trigger.process).toHaveBeenCalledWith(mockData, undefined);
152+
expect(trigger.getConsecutiveFailures()).toBe(1);
153+
154+
// Trigger should still be running
155+
expect((trigger as any).timer).not.toBeNull();
156+
});
157+
});
158+
});

package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
"client": "pnpm --filter @notification-system/anticapture-client"
1818
},
1919
"devDependencies": {
20-
"@types/pg": "^8.15.2",
21-
"pg": "^8.16.0",
20+
"@types/pg": "^8.15.5",
21+
"pg": "^8.16.3",
2222
"ts-node": "^10.9.2",
23-
"turbo": "^2.5.0",
23+
"turbo": "^2.5.5",
2424
"typescript": "5.8.2"
2525
},
26-
"packageManager": "pnpm@9.0.0",
26+
"packageManager": "pnpm@10.14.0",
2727
"engines": {
2828
"node": ">=18"
2929
}

packages/anticapture-client/dist/anticapture-client.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export declare class AnticaptureClient {
77
private readonly httpClient;
88
constructor(httpClient: AxiosInstance);
99
private query;
10+
private buildHeaders;
1011
/**
1112
* Fetches all DAOs from the anticapture GraphQL API with full type safety
1213
* @returns Array of DAO objects with blockTime added

0 commit comments

Comments
 (0)