Skip to content

Commit 3427d6f

Browse files
Merge pull request #130 from blockful/feat/batch-operations
Feat/batch operations
2 parents 8472a19 + fc3e9c7 commit 3427d6f

56 files changed

Lines changed: 2145 additions & 52 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/consumers/src/app.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ export class App {
2020
telegramBotToken: string,
2121
subscriptionServerUrl: string,
2222
httpClient: AxiosInstance,
23-
rabbitmqUrl: string
23+
rabbitmqUrl: string,
24+
ensResolver: EnsResolverService
2425
) {
2526
const subscriptionApi = new SubscriptionAPIService(subscriptionServerUrl);
2627
const anticaptureClient = new AnticaptureClient(httpClient);
27-
const ensResolver = new EnsResolverService();
2828
const daoService = new DAOService(anticaptureClient, subscriptionApi);
2929
const walletService = new WalletService(subscriptionApi, ensResolver);
3030
const explorerService = new ExplorerService();

apps/consumers/src/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
import axios from 'axios';
1414
import { App } from './app';
1515
import { loadConfig } from './config/env';
16+
import { EnsResolverService } from './services/ens-resolver.service';
1617

1718
const config = loadConfig();
1819

20+
// Create ENS resolver
21+
const ensResolver = new EnsResolverService();
22+
1923
// Create and start the application
2024
const app = new App(
2125
config.telegramBotToken,
2226
config.subscriptionServerUrl,
2327
axios.create({ baseURL: config.anticaptureGraphqlEndpoint }),
24-
config.rabbitmqUrl
28+
config.rabbitmqUrl,
29+
ensResolver
2530
);
2631

2732
(async () => {

apps/consumers/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
"resolveJsonModule": true
1818
},
1919
"include": ["src/**/*"],
20-
"exclude": ["node_modules", "dist"]
20+
"exclude": ["node_modules", "dist", "src/**/*.test.ts", "src/**/*.spec.ts"]
2121
}

apps/dispatcher/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"license": "ISC",
2020
"dependencies": {
2121
"@jest/globals": "^29.7.0",
22+
"@notification-system/anticapture-client": "workspace:*",
2223
"@notification-system/rabbitmq-client": "workspace:*",
2324
"axios": "^1.9.0",
2425
"dotenv": "^16.5.0",

apps/dispatcher/src/app.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,21 @@ import { RabbitMQNotificationService } from './services/notification/rabbitmq-no
77
import { NewProposalTriggerHandler } from './services/triggers/new-proposal-trigger.service';
88
import { VotingPowerTriggerHandler } from './services/triggers/voting-power-trigger.service';
99
import { ProposalFinishedTriggerHandler } from './services/triggers/proposal-finished-trigger.service';
10+
import { NonVotingHandler } from './services/triggers/non-voting-handler.service';
1011
import { RabbitMQConnection, RabbitMQPublisher } from '@notification-system/rabbitmq-client';
12+
import { AnticaptureClient } from '@notification-system/anticapture-client';
1113

1214
export class App {
1315
private rabbitMQConsumerService!: RabbitMQConsumerService;
1416
private rabbitmqConnection!: RabbitMQConnection;
1517
private isCreated = false;
1618

17-
constructor(private subscriptionServerUrl: string, private rabbitmqUrl: string) {}
19+
constructor(
20+
private subscriptionServerUrl: string,
21+
private rabbitmqUrl: string,
22+
private anticaptureGraphqlEndpoint: string,
23+
private anticaptureHttpClient?: any
24+
) {}
1825

1926
private async setupServices(): Promise<void> {
2027
if (this.isCreated) return;
@@ -26,6 +33,16 @@ export class App {
2633
},
2734
});
2835
const subscriptionClient = new SubscriptionClient(subscriptionAxiosClient);
36+
37+
// Setup AnticaptureClient - use provided client or create new one
38+
const anticaptureAxiosClient = this.anticaptureHttpClient || axios.create({
39+
baseURL: this.anticaptureGraphqlEndpoint,
40+
headers: {
41+
'Content-Type': 'application/json',
42+
},
43+
});
44+
const anticaptureClient = new AnticaptureClient(anticaptureAxiosClient);
45+
2946
this.rabbitmqConnection = new RabbitMQConnection(this.rabbitmqUrl);
3047
await this.rabbitmqConnection.connect();
3148
const publisher = await RabbitMQPublisher.create(this.rabbitmqConnection);
@@ -48,6 +65,12 @@ export class App {
4865
new ProposalFinishedTriggerHandler(subscriptionClient, notificationFactory)
4966
);
5067

68+
// Add second handler for proposal-finished to process non-voting addresses
69+
triggerProcessorService.addHandler(
70+
'proposal-finished',
71+
new NonVotingHandler(subscriptionClient, notificationFactory, anticaptureClient)
72+
);
73+
5174
this.rabbitMQConsumerService = new RabbitMQConsumerService(this.rabbitmqUrl, triggerProcessorService);
5275
this.isCreated = true;
5376
}

apps/dispatcher/src/envConfig.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { z } from 'zod';
1010
const envSchema = z.object({
1111
SUBSCRIPTION_SERVER_URL: z.string().url(),
1212
RABBITMQ_URL: z.string().url(),
13+
ANTICAPTURE_GRAPHQL_ENDPOINT: z.string().url(),
1314
});
1415

1516
export function loadConfig() {
@@ -19,5 +20,6 @@ export function loadConfig() {
1920
return {
2021
subscriptionServerUrl: env.SUBSCRIPTION_SERVER_URL,
2122
rabbitmqUrl: env.RABBITMQ_URL,
23+
anticaptureGraphqlEndpoint: env.ANTICAPTURE_GRAPHQL_ENDPOINT,
2224
} as const;
2325
}

apps/dispatcher/src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ import { App } from './app';
22
import { loadConfig } from './envConfig';
33

44
const config = loadConfig();
5-
const app = new App(config.subscriptionServerUrl, config.rabbitmqUrl);
5+
const app = new App(
6+
config.subscriptionServerUrl,
7+
config.rabbitmqUrl,
8+
config.anticaptureGraphqlEndpoint
9+
);
610

711
(async () => {
812
await app.start();

apps/dispatcher/src/interfaces/subscription-client.interface.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ export interface ISubscriptionClient {
3939
*/
4040
shouldSend(subscribers: User[], eventId: string, daoId: string): Promise<Notification[]>;
4141

42+
/**
43+
* Filters multiple groups of subscribers in batch
44+
* @param requests Array of shouldSend requests
45+
* @returns Array of notification arrays corresponding to each request
46+
*/
47+
shouldSendBatch(requests: Array<{
48+
subscribers: User[];
49+
eventId: string;
50+
daoId: string;
51+
}>): Promise<Notification[][]>;
52+
4253
/**
4354
* Marks notifications as sent for successful deliveries
4455
* @param notifications List of notifications to mark as sent
@@ -51,4 +62,18 @@ export interface ISubscriptionClient {
5162
* @returns List of users who own the address
5263
*/
5364
getWalletOwners(address: string): Promise<User[]>;
65+
66+
/**
67+
* Get users who own specific wallet addresses (batch operation)
68+
* @param addresses Array of wallet addresses
69+
* @returns Record mapping addresses to arrays of users who own each address
70+
*/
71+
getWalletOwnersBatch(addresses: string[]): Promise<Record<string, User[]>>;
72+
73+
/**
74+
* Get all unique addresses being followed by users in a specific DAO
75+
* @param daoId The DAO ID
76+
* @returns List of unique addresses being followed
77+
*/
78+
getFollowedAddresses(daoId: string): Promise<string[]>;
5479
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { ISubscriptionClient, User, Notification } from '../interfaces/subscription-client.interface';
2+
import { NotificationClientFactory } from './notification/notification-factory.service';
3+
4+
/**
5+
* Service for handling batch notification processing
6+
* Provides reusable batch operations for sending notifications efficiently
7+
*/
8+
export class BatchNotificationService {
9+
constructor(
10+
private readonly subscriptionClient: ISubscriptionClient,
11+
private readonly notificationFactory: NotificationClientFactory
12+
) {}
13+
14+
/**
15+
* Prepares batch data by fetching followers and applying deduplication
16+
* @param addresses - Addresses to process (e.g., non-voting addresses)
17+
* @param daoId - DAO identifier
18+
* @param eventIdGenerator - Function to generate unique event IDs for each address
19+
* @returns Prepared batch data with followers and notifications to send
20+
*/
21+
async prepareBatchData(
22+
addresses: string[],
23+
daoId: string,
24+
eventIdGenerator: (address: string) => string
25+
): Promise<BatchNotificationData[]> {
26+
// Batch fetch all followers for all addresses in a single request
27+
const addressFollowersMap = await this.subscriptionClient.getWalletOwnersBatch(addresses);
28+
const addressFollowers = addresses
29+
.map(address => ({ address, followers: addressFollowersMap[address] || [] }))
30+
.filter(af => af.followers.length > 0);
31+
32+
if (addressFollowers.length === 0) {
33+
return [];
34+
}
35+
36+
// Batch check deduplication for all addresses in one request
37+
const shouldSendRequests = addressFollowers.map(({ address, followers }) => ({
38+
subscribers: followers,
39+
eventId: eventIdGenerator(address),
40+
daoId
41+
}));
42+
43+
const batchResults = await this.subscriptionClient.shouldSendBatch(shouldSendRequests);
44+
45+
// Map results back to the original structure
46+
return addressFollowers.map(({ address, followers }, index) => ({
47+
address,
48+
followers,
49+
notificationsToSend: batchResults[index] || []
50+
}));
51+
}
52+
53+
/**
54+
* Filters batch data to only include items with notifications to send
55+
* @param batchData - Batch data from prepareBatchData
56+
* @returns Filtered data with valid notifications
57+
*/
58+
filterValidNotifications(batchData: BatchNotificationData[]): BatchNotificationData[] {
59+
return batchData.filter(result => result.notificationsToSend.length > 0);
60+
}
61+
62+
/**
63+
* Executes parallel sending of notifications and marks them as sent
64+
* @param validNotifications - Notifications ready to be sent
65+
* @param messageGenerator - Function to generate message for each address
66+
* @param metadataGenerator - Optional function to generate metadata for each notification
67+
*/
68+
async executeBatchSend(
69+
validNotifications: BatchNotificationData[],
70+
messageGenerator: (address: string) => string,
71+
metadataGenerator?: (address: string) => Record<string, any>
72+
): Promise<void> {
73+
const sendPromises: Promise<void>[] = [];
74+
const allNotificationsToMark: Notification[] = [];
75+
76+
for (const { address, followers, notificationsToSend } of validNotifications) {
77+
const followerMap = new Map(followers.map((f: User) => [f.id, f]));
78+
const message = messageGenerator(address);
79+
const metadata = metadataGenerator ? metadataGenerator(address) : undefined;
80+
81+
this.queueNotificationSends(
82+
notificationsToSend,
83+
followerMap,
84+
message,
85+
metadata,
86+
sendPromises
87+
);
88+
89+
allNotificationsToMark.push(...notificationsToSend);
90+
}
91+
92+
// Execute all sends in parallel and mark as sent
93+
await Promise.all([
94+
Promise.all(sendPromises),
95+
this.subscriptionClient.markAsSent(allNotificationsToMark)
96+
]);
97+
}
98+
99+
/**
100+
* Queues individual notification sends for parallel execution
101+
* @param notificationsToSend - Notifications to queue
102+
* @param followerMap - Map of follower IDs to follower data
103+
* @param message - Formatted notification message
104+
* @param metadata - Optional metadata for the notification
105+
* @param sendPromises - Array to collect send promises
106+
*/
107+
private queueNotificationSends(
108+
notificationsToSend: Notification[],
109+
followerMap: Map<string, User>,
110+
message: string,
111+
metadata: Record<string, any> | undefined,
112+
sendPromises: Promise<void>[]
113+
): void {
114+
for (const notification of notificationsToSend) {
115+
const follower = followerMap.get(notification.user_id);
116+
if (!follower) continue;
117+
118+
sendPromises.push(
119+
this.notificationFactory
120+
.getClient(follower.channel)
121+
.sendNotification({
122+
userId: follower.id,
123+
channel: follower.channel,
124+
channelUserId: follower.channel_user_id,
125+
message,
126+
metadata
127+
})
128+
.catch(error => {
129+
console.error(`Failed to send notification to user ${follower.id}:`, error);
130+
})
131+
);
132+
}
133+
}
134+
135+
/**
136+
* Simplified method to send notifications to multiple addresses
137+
* Combines prepare, filter, and execute steps
138+
* @param addresses - Addresses to process
139+
* @param daoId - DAO identifier
140+
* @param eventIdGenerator - Function to generate unique event IDs
141+
* @param messageGenerator - Function to generate messages
142+
* @param metadataGenerator - Optional function to generate metadata
143+
*/
144+
async sendBatchNotifications(
145+
addresses: string[],
146+
daoId: string,
147+
eventIdGenerator: (address: string) => string,
148+
messageGenerator: (address: string) => string,
149+
metadataGenerator?: (address: string) => Record<string, any>
150+
): Promise<void> {
151+
const batchData = await this.prepareBatchData(addresses, daoId, eventIdGenerator);
152+
const validNotifications = this.filterValidNotifications(batchData);
153+
154+
if (validNotifications.length === 0) {
155+
return;
156+
}
157+
158+
await this.executeBatchSend(validNotifications, messageGenerator, metadataGenerator);
159+
}
160+
}
161+
162+
/**
163+
* Interface for batch notification data
164+
*/
165+
export interface BatchNotificationData {
166+
address: string;
167+
followers: User[];
168+
notificationsToSend: Notification[];
169+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Service for formatting data for display in notifications
3+
*/
4+
export class FormattingService {
5+
/**
6+
* Formats an address for display (0x1234...5678)
7+
* @param address - Full address to format
8+
* @returns Shortened address format
9+
*/
10+
static formatAddress(address: string): string {
11+
if (address.length < 10) return address;
12+
return `${address.slice(0, 6)}...${address.slice(-4)}`;
13+
}
14+
15+
/**
16+
* Formats a list of proposals for display in notifications
17+
* @param proposals - Proposals to format
18+
* @returns Formatted proposal list with bullet points
19+
*/
20+
static formatProposalList(proposals: any[]): string {
21+
return proposals
22+
.map(p => {
23+
const title = p.title || p.description.split('\n')[0].replace(/^#+\s*/, '').trim();
24+
return `• ${title}`;
25+
})
26+
.join('\n');
27+
}
28+
29+
/**
30+
* Creates a formatted non-voting alert message
31+
* @param address - Non-voting address
32+
* @param daoId - DAO identifier
33+
* @param proposalsCount - Number of proposals checked
34+
* @param proposalTitles - Formatted list of proposal titles
35+
* @returns Formatted alert message
36+
*/
37+
static createNonVotingAlertMessage(
38+
address: string,
39+
daoId: string,
40+
proposalsCount: number,
41+
proposalTitles: string
42+
): string {
43+
const formattedAddress = FormattingService.formatAddress(address);
44+
45+
return `⚠️ Non-Voting Alert for DAO ${daoId.toUpperCase()}
46+
47+
The address ${formattedAddress} that you follow hasn't voted in the last ${proposalsCount} proposals:
48+
49+
${proposalTitles}
50+
51+
Consider reaching out to encourage participation!`;
52+
}
53+
}

0 commit comments

Comments
 (0)