Skip to content

Commit 92b4ff2

Browse files
Merge pull request #142 from blockful/dev
Dev
2 parents 970bb2a + dd85611 commit 92b4ff2

23 files changed

Lines changed: 351 additions & 205 deletions

apps/consumers/src/app.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class App {
4747
if (this.rabbitmqConsumerService) {
4848
await this.rabbitmqConsumerService.stop();
4949
}
50+
5051
this.telegramBotService.stop('SIGINT');
5152
}
5253
}

apps/consumers/src/services/rabbitmq-notification-consumer.service.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,20 @@ export class RabbitMQNotificationConsumerService {
3535
}
3636

3737
async stop(): Promise<void> {
38-
await this.consumer.close();
39-
await this.connection.close();
38+
if (this.consumer) {
39+
await this.consumer.close();
40+
}
41+
42+
if (this.connection) {
43+
await this.connection.close();
44+
}
4045
}
4146

4247
private async processNotification(message: RabbitMQMessage<NotificationPayload>): Promise<void> {
4348
if (message.type !== 'NOTIFICATION_EVENT') {
4449
return;
4550
}
51+
4652
try {
4753
await this.telegramBotService.sendNotification(message.payload);
4854
} catch (error: any) {

apps/dispatcher/src/app.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { AnticaptureClient } from '@notification-system/anticapture-client';
1414
export class App {
1515
private rabbitMQConsumerService!: RabbitMQConsumerService;
1616
private rabbitmqConnection!: RabbitMQConnection;
17+
private publisher!: RabbitMQPublisher;
1718
private isCreated = false;
1819

1920
constructor(
@@ -45,9 +46,9 @@ export class App {
4546

4647
this.rabbitmqConnection = new RabbitMQConnection(this.rabbitmqUrl);
4748
await this.rabbitmqConnection.connect();
48-
const publisher = await RabbitMQPublisher.create(this.rabbitmqConnection);
49+
this.publisher = await RabbitMQPublisher.create(this.rabbitmqConnection);
4950
const notificationFactory = new NotificationClientFactory();
50-
notificationFactory.addClient('telegram', new RabbitMQNotificationService(publisher));
51+
notificationFactory.addClient('telegram', new RabbitMQNotificationService(this.publisher));
5152
const triggerProcessorService = new TriggerProcessorService();
5253

5354
triggerProcessorService.addHandler(
@@ -82,7 +83,16 @@ export class App {
8283
}
8384

8485
async stop(): Promise<void> {
85-
await this.rabbitMQConsumerService?.stop();
86-
await this.rabbitmqConnection?.close();
86+
if (this.rabbitMQConsumerService) {
87+
await this.rabbitMQConsumerService.stop();
88+
}
89+
90+
if (this.publisher) {
91+
await this.publisher.close();
92+
}
93+
94+
if (this.rabbitmqConnection) {
95+
await this.rabbitmqConnection.close();
96+
}
8797
}
8898
}

apps/dispatcher/src/services/rabbitmq-consumer.service.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ export class RabbitMQConsumerService {
2323
}
2424

2525
async stop(): Promise<void> {
26-
await this.consumer.close();
27-
await this.connection.close();
26+
if (this.consumer) {
27+
await this.consumer.close();
28+
}
29+
30+
if (this.connection) {
31+
await this.connection.close();
32+
}
2833
}
2934
}

apps/dispatcher/src/services/triggers/non-voting-handler.service.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ describe('NonVotingHandler', () => {
107107
expect(result.messageId).toBe('proposal-finished');
108108
expect(mockAnticaptureClient.listProposals).toHaveBeenCalledWith(
109109
expect.objectContaining({
110-
status: expect.arrayContaining(['SUCCEEDED']),
111-
limit: 3
110+
status: expect.arrayContaining(['EXECUTED', 'SUCCEEDED', 'DEFEATED', 'EXPIRED', 'CANCELED']),
111+
limit: 15,
112+
orderDirection: 'desc'
112113
}),
113114
'ENS'
114115
);

apps/integrated-tests/jest.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ export default {
88
globalTeardown: '<rootDir>/src/setup/jest/jest-global-teardown.ts',
99
setupFilesAfterEnv: ['<rootDir>/src/setup/jest/jest-setup-after-env.ts'],
1010
forceExit: true,
11-
silent: true,
11+
silent: false,
1212
verbose: true,
1313
};

apps/integrated-tests/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
"@notification-system/rabbitmq-client": "workspace:*",
1616
"@notification-system/subscription-server": "workspace:*",
1717
"@testcontainers/rabbitmq": "^10.15.0",
18-
"@types/amqplib": "^0.10.5",
1918
"@types/jest": "^29.5.14",
2019
"@types/node": "^20.17.46",
2120
"@types/uuid": "^9.0.0",
22-
"amqplib": "^0.10.4",
2321
"dotenv": "^16.5.0",
2422
"jest": "^29.7.0",
2523
"knex": "^3.1.0",
24+
"rabbitmq-client": "^5.0.5",
2625
"sqlite3": "^5.1.7",
2726
"ts-jest": "^29.3.2",
2827
"typescript": "^5.8.3",

apps/integrated-tests/src/config/services.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
* Service configuration for integration tests
33
*/
44

5+
// Generate random port to avoid conflicts in parallel tests
6+
const randomPort = 14000 + Math.floor(Math.random() * 1000);
7+
58
export const serviceConfig = {
69
// Service ports
710
ports: {
8-
subscriptionServer: 14001,
11+
subscriptionServer: randomPort,
912
},
1013

1114
// Service URLs
1215
urls: {
13-
subscriptionServer: 'http://localhost:14001',
16+
subscriptionServer: `http://localhost:${randomPort}`,
1417
},
1518

1619
// Logic system configuration

apps/integrated-tests/src/setup/jest/jest-global-setup.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,18 @@ declare global {
1313
*/
1414
export default async function globalSetup() {
1515
const container = await new RabbitMQContainer()
16-
.withStartupTimeout(150000)
16+
.withStartupTimeout(30000)
1717
.start();
1818

19-
const amqpUrl = container.getAmqpUrl();
19+
let amqpUrl = container.getAmqpUrl();
20+
21+
// Fix URL to include credentials (testcontainers doesn't include them)
22+
const urlObj = new URL(amqpUrl);
23+
if (!urlObj.username && !urlObj.password) {
24+
urlObj.username = 'guest';
25+
urlObj.password = 'guest';
26+
}
27+
amqpUrl = urlObj.toString();
2028

2129
// Store RabbitMQ URL in environment variable for tests to access
2230
process.env.TEST_RABBITMQ_URL = amqpUrl;

apps/integrated-tests/src/setup/rabbitmq-setup.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import { RabbitMQSetupConfig } from './types/rabbitmq-setup.types';
1212
class GlobalRabbitMQSetup {
1313
private containerManager = new RabbitMQContainerManager();
1414
private connectionManager = new RabbitMQConnectionManager();
15-
private queueManager = new RabbitMQQueueManager(this.connectionManager);
16-
private spyConsumerManager = new RabbitMQSpyConsumerManager(this.connectionManager);
15+
private queueManager: RabbitMQQueueManager | null = null;
16+
private spyConsumerManager: RabbitMQSpyConsumerManager | null = null;
1717
private eventCollector = new EventCollector();
1818
private isStarted = false;
19+
private amqpUrl: string | null = null;
1920

2021
async getOrCreateSetup(): Promise<RabbitMQSetupConfig> {
2122
if (this.isStarted) {
@@ -33,6 +34,12 @@ class GlobalRabbitMQSetup {
3334
// Setup container and connection
3435
const container = await this.containerManager.getContainer();
3536
const amqpUrl = container.getAmqpUrl();
37+
this.amqpUrl = amqpUrl;
38+
39+
// Initialize managers with the correct URL
40+
this.queueManager = new RabbitMQQueueManager(this.connectionManager, amqpUrl);
41+
this.spyConsumerManager = new RabbitMQSpyConsumerManager(this.connectionManager, amqpUrl);
42+
3643
await this.connectionManager.initialize(amqpUrl);
3744

3845
// Setup initial queues and spy consumers
@@ -50,22 +57,30 @@ class GlobalRabbitMQSetup {
5057
}
5158

5259
private async setupInitialQueues(): Promise<void> {
53-
await this.queueManager.clearQueue('dispatcher-queue');
54-
await this.spyConsumerManager.setupSpyConsumer({
55-
queueName: 'dispatcher-queue',
56-
eventCollector: this.eventCollector
57-
});
60+
if (this.queueManager && this.spyConsumerManager) {
61+
await this.queueManager.clearQueue('dispatcher-queue');
62+
await this.spyConsumerManager.setupSpyConsumer({
63+
queueName: 'dispatcher-queue',
64+
eventCollector: this.eventCollector
65+
});
66+
}
5867
}
5968

6069
async globalCleanup(): Promise<void> {
6170
if (!this.isStarted) return;
6271

63-
await this.spyConsumerManager.cleanup();
72+
if (this.spyConsumerManager) {
73+
await this.spyConsumerManager.cleanup();
74+
}
75+
if (this.queueManager) {
76+
await this.queueManager.cleanup();
77+
}
6478
await this.connectionManager.cleanup();
6579
await this.containerManager.cleanup();
6680

6781
this.isStarted = false;
6882
this.eventCollector.clear();
83+
this.amqpUrl = null;
6984
}
7085
}
7186

@@ -101,8 +116,8 @@ export class RabbitMQTestSetup {
101116
this.connectionManager = new RabbitMQConnectionManager();
102117
await this.connectionManager.initialize(amqpUrl);
103118

104-
this.queueManager = new RabbitMQQueueManager(this.connectionManager);
105-
this.spyConsumerManager = new RabbitMQSpyConsumerManager(this.connectionManager);
119+
this.queueManager = new RabbitMQQueueManager(this.connectionManager, amqpUrl);
120+
this.spyConsumerManager = new RabbitMQSpyConsumerManager(this.connectionManager, amqpUrl);
106121

107122
// Setup spy consumer for dispatcher queue
108123
await this.spyConsumerManager.setupSpyConsumer({
@@ -118,6 +133,7 @@ export class RabbitMQTestSetup {
118133

119134
if (this.queueManager) {
120135
await this.queueManager.clearQueue('dispatcher-queue');
136+
await this.queueManager.cleanup();
121137
}
122138

123139
if (this.spyConsumerManager) {

0 commit comments

Comments
 (0)