Skip to content

Commit 7961edc

Browse files
Merge pull request #220 from blockful/dev
Dev
2 parents b10ff96 + ea14f11 commit 7961edc

5 files changed

Lines changed: 61 additions & 20 deletions

File tree

apps/dispatcher/src/services/triggers/voting-power-trigger.service.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,4 +845,41 @@ describe('VotingPowerTriggerHandler - eventId deduplication', () => {
845845
// Should send 2 notifications (one for each delegation received)
846846
expect(sentNotifications).toHaveLength(2);
847847
});
848+
849+
it('should NOT send duplicate notifications when same message is processed twice (idempotency)', async () => {
850+
// Simulates the Logic System re-sending the same event due to cursor not advancing
851+
// The deduplication should block the second run entirely
852+
853+
const walletA = '0xWalletA000000000000000000000000000000001';
854+
const stubUser1: User = { id: 'user-1', channel: 'telegram', channel_user_id: '111', created_at: new Date() };
855+
856+
const { handler, sentNotifications } = createHandlerWithDeduplication({
857+
[walletA]: [stubUser1]
858+
});
859+
860+
const message: DispatcherMessage = {
861+
triggerId: 'voting-power-changed',
862+
events: [
863+
{
864+
daoId: 'test-dao',
865+
accountId: walletA,
866+
transactionHash: '0xDuplicateTxHash',
867+
changeType: 'other',
868+
delta: '1000',
869+
logIndex: 0,
870+
chainId: 1,
871+
timestamp: 1234567890
872+
}
873+
]
874+
};
875+
876+
// First run: should send notification
877+
await handler.handleMessage(message);
878+
const firstRunCount = sentNotifications.length;
879+
expect(firstRunCount).toBe(1);
880+
881+
// Second run (same message): deduplication should block it
882+
await handler.handleMessage(message);
883+
expect(sentNotifications.length).toBe(firstRunCount);
884+
});
848885
});

apps/dispatcher/src/services/triggers/voting-power-trigger.service.ts

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
100100
): Promise<void> {
101101
const { daoId, accountId, sourceAccountId, delta, transactionHash, chainId, votingPower, logIndex } = votingPowerEvent;
102102

103-
const subscribers = await this.getNotificationSubscribers(
103+
const { subscribers, eventId } = await this.getNotificationSubscribers(
104104
accountId, // who receives the delegation
105105
daoId,
106106
transactionHash,
@@ -137,7 +137,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
137137
chainId
138138
});
139139

140-
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, transactionHash, daoId, metadata, buttons);
140+
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, eventId, daoId, metadata, buttons);
141141
}
142142

143143
/**
@@ -160,7 +160,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
160160
return;
161161
}
162162

163-
const subscribers = await this.getNotificationSubscribers(
163+
const { subscribers, eventId } = await this.getNotificationSubscribers(
164164
sourceAccountId, // who MADE the delegation
165165
daoId,
166166
transactionHash,
@@ -219,7 +219,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
219219
chainId
220220
});
221221

222-
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, transactionHash, daoId, metadata, buttons);
222+
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, eventId, daoId, metadata, buttons);
223223
}
224224

225225
/**
@@ -232,7 +232,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
232232
): Promise<void> {
233233
const { daoId, accountId, changeType, transactionHash, chainId, transfer, logIndex } = votingPowerEvent;
234234

235-
const subscribers = await this.getNotificationSubscribers(
235+
const { subscribers, eventId } = await this.getNotificationSubscribers(
236236
accountId, daoId, transactionHash, logIndex, walletOwnersMap, daoSubscribersMap
237237
);
238238
if (subscribers.length === 0) return;
@@ -242,7 +242,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
242242
: this.buildGenericNotification(votingPowerEvent);
243243

244244
const buttons = buildButtons({ triggerType: 'votingPowerChange', txHash: transactionHash, chainId });
245-
await this.sendNotificationsToSubscribers(subscribers, message, transactionHash, daoId, metadata, buttons);
245+
await this.sendNotificationsToSubscribers(subscribers, message, eventId, daoId, metadata, buttons);
246246
}
247247

248248
/**
@@ -293,7 +293,6 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
293293

294294
/**
295295
* Shared method to get notification subscribers with deduplication
296-
*
297296
* eventId format: ${transactionHash}-${logIndex}-${accountId}-voting-power
298297
*/
299298
private async getNotificationSubscribers(
@@ -303,10 +302,12 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
303302
logIndex: number,
304303
walletOwnersMap: Record<string, User[]>,
305304
daoSubscribersMap: Record<string, User[]>
306-
): Promise<User[]> {
305+
): Promise<{ subscribers: User[]; eventId: string }> {
306+
const eventId = `${transactionHash}-${logIndex}-${accountId}-voting-power`;
307+
307308
// Get wallet owners from cache
308309
const walletOwners = walletOwnersMap[accountId] || [];
309-
if (walletOwners.length === 0) return [];
310+
if (walletOwners.length === 0) return { subscribers: [], eventId };
310311

311312
// Get DAO subscribers from cache
312313
const daoSubscribers = daoSubscribersMap[daoId] || [];
@@ -316,8 +317,8 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
316317
daoSubscribers.some(sub => sub.id === owner.id)
317318
);
318319

319-
if (subscribedOwners.length === 0) return [];
320-
const eventId = `${transactionHash}-${logIndex}-${accountId}-voting-power`;
320+
if (subscribedOwners.length === 0) return { subscribers: [], eventId };
321+
321322
// Check deduplication for all subscribed owners at once
322323
const shouldSendNotifications = await this.subscriptionClient.shouldSend(
323324
subscribedOwners,
@@ -326,10 +327,10 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
326327
);
327328

328329
// Final filtered list of subscribers
329-
const finalSubscribers = subscribedOwners.filter(owner =>
330+
const subscribers = subscribedOwners.filter(owner =>
330331
shouldSendNotifications.some(notification => notification.user_id === owner.id)
331332
);
332-
return finalSubscribers;
333+
return { subscribers, eventId };
333334
}
334335

335336
/**

apps/integrated-tests/tests/slack/slack-voting-power-trigger.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ describe('Slack Voting Power Trigger - Integration Test', () => {
119119
}
120120

121121
// Verify database records
122+
// eventId format: ${transactionHash}-${logIndex}-${accountId}-voting-power
123+
const expectedEventId = `0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef-0-${userWalletAddress}-voting-power`;
122124
const notifications = await dbHelper.getNotifications();
123125
const vpNotification = notifications.find(n =>
124126
n.user_id === user.id &&
125-
n.event_id === '0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef' &&
127+
n.event_id === expectedEventId &&
126128
n.dao_id === TEST_DAO_ID
127129
);
128130
expect(vpNotification).toBeDefined();

apps/logic-system/src/triggers/voting-power-changed-trigger.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ export class VotingPowerChangedTrigger extends Trigger<ProcessedVotingPowerHisto
4747

4848
await this.dispatcherService.sendMessage(message);
4949

50-
// Update the last processed timestamp to the most recent timestamp
50+
// Update the last processed timestamp to the most recent timestamp + 1 second
5151
// Since data comes ordered by timestamp asc, the last item has the latest timestamp
52-
this.lastProcessedTimestamp = data[data.length - 1].timestamp;
52+
// Adding 1 avoids reprocessing the same event since the API uses >= (gte) for fromDate
53+
this.lastProcessedTimestamp = String(Number(data[data.length - 1].timestamp) + 1);
5354
}
5455

5556
/**

apps/logic-system/tests/voting-power-trigger.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ describe('VotingPowerChangedTrigger', () => {
6666
});
6767
});
6868

69-
it('should update timestamp to the last item in array', async () => {
69+
it('should update timestamp to the last item in array + 1 second', async () => {
7070
await trigger.process(mockVotingPowerData);
7171

7272
// Should update to timestamp of last item (chronologically last)
7373
const lastProcessed = (trigger as any).lastProcessedTimestamp;
74-
expect(lastProcessed).toBe('1625184000'); // Last item timestamp
74+
expect(lastProcessed).toBe('1625184001'); // Last item timestamp + 1
7575
});
7676

7777
it('should handle single item correctly', async () => {
@@ -85,7 +85,7 @@ describe('VotingPowerChangedTrigger', () => {
8585
});
8686

8787
const lastProcessed = (trigger as any).lastProcessedTimestamp;
88-
expect(lastProcessed).toBe('1625097600');
88+
expect(lastProcessed).toBe('1625097601'); // timestamp + 1
8989
});
9090
});
9191

@@ -109,7 +109,7 @@ describe('VotingPowerChangedTrigger', () => {
109109

110110
// Verify the second call used the updated timestamp
111111
const secondCallArgs = mockVotingPowerRepository.listVotingPowerHistory.mock.calls[1][0];
112-
expect(secondCallArgs).toBe('1625097600'); // Timestamp from first execution
112+
expect(secondCallArgs).toBe('1625097601'); // Timestamp from first execution
113113
});
114114

115115
it('should not process when no new data available', async () => {

0 commit comments

Comments
 (0)