Skip to content

Commit 0aaa9a8

Browse files
authored
Refactor bull queues to a common module for shared configuration (#308)
# Description This PR does the following: - Refactor multiple disparate instantiations of `BullModule.forRootAsync()` into a single, global `QueueModule`. This has the following benefits: - Single location for configuration of queues & default parameters - Proper use of NestJS dependency injection to avoid unnecessary separate instantiations of `BullModule`; cuts down on app memory profile - Add a prefix to all Redis and BullMq cache entries to distinguish from other apps using Redis
1 parent 713c362 commit 0aaa9a8

25 files changed

+135
-355
lines changed

services/account/libs/common/src/types/dtos/accounts.response.dto.ts

+12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
22
import { IsNotEmpty, IsOptional } from 'class-validator';
33
import type { HandleResponse } from '@frequency-chain/api-augment/interfaces';
4+
import { Text, u16 } from '@polkadot/types';
5+
6+
export class HandleResponseDTO implements HandleResponse {
7+
@ApiProperty()
8+
base_handle: Text;
9+
10+
@ApiProperty()
11+
canonical_base: Text;
12+
13+
@ApiProperty()
14+
suffix: u16;
15+
}
416

517
export class AccountResponse {
618
@ApiProperty()

services/graph/ENVIRONMENT.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ This application recognizes the following environment variables:
55
| Name | Description | Range/Type | Required? | Default |
66
| ----------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | :--------------------------------: | :----------: | :-----: |
77
| `API_PORT` | HTTP port that the application listens on | 1025 - 65535 | | 3000 |
8+
| `CACHE_KEY_PREFIX` | Prefix to use for Redis cache keys | string | | content-watcher: |
89
| `CAPACITY_LIMIT` | Maximum amount of provider capacity this app is allowed to use (per epoch) type: 'percentage' 'amount' value: number (may be percentage, ie '80', or absolute amount of capacity) | JSON [(example)](./env.template) | Y | |
910
|`DEBOUNCE_SECONDS`|Number of seconds to retain pending graph updates in the Redis cache to avoid redundant fetches from the chain|>= 0||| | > 0 | | 100 |
1011
| `FREQUENCY_URL` | Blockchain node address | http(s): or ws(s): URL | Y | |

services/graph/apps/api/src/api.module.ts

+5-67
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Module } from '@nestjs/common';
22
import { EventEmitterModule } from '@nestjs/event-emitter';
3-
import { BullModule } from '@nestjs/bullmq';
43
import { ScheduleModule } from '@nestjs/schedule';
54
import { RedisModule } from '@songkeys/nestjs-redis';
65
import { BullBoardModule } from '@bull-board/nestjs';
@@ -9,10 +8,10 @@ import { ExpressAdapter } from '@bull-board/express';
98
import { GraphControllerV1 } from './controllers/v1/graph-v1.controller';
109
import { HealthController } from './controllers/health.controller';
1110
import { ApiService } from './api.service';
12-
import { BlockchainModule, ConfigModule, ConfigService, GraphStateManager, SECONDS_PER_BLOCK } from '#lib';
13-
import * as QueueConstants from '#lib/utils/queues';
11+
import { BlockchainModule, ConfigModule, ConfigService, GraphStateManager } from '#lib';
12+
import * as QueueConstants from '#lib/queues/queue-constants';
1413
import { WebhooksControllerV1 } from './controllers/v1/webhooks-v1.controller';
15-
import { MILLISECONDS_PER_SECOND } from 'time-constants';
14+
import { QueueModule } from '#lib/queues/queue.module';
1615

1716
@Module({
1817
imports: [
@@ -22,35 +21,13 @@ import { MILLISECONDS_PER_SECOND } from 'time-constants';
2221
{
2322
imports: [ConfigModule],
2423
useFactory: (configService: ConfigService) => ({
25-
config: [{ url: configService.redisUrl.toString() }],
24+
config: [{ url: configService.redisUrl.toString(), keyPrefix: configService.cacheKeyPrefix }],
2625
}),
2726
inject: [ConfigService],
2827
},
2928
true, // isGlobal
3029
),
31-
BullModule.forRootAsync({
32-
imports: [ConfigModule],
33-
useFactory: (configService: ConfigService) => {
34-
// Note: BullMQ doesn't honor a URL for the Redis connection, and
35-
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
36-
// it by changing the URL to use 'http://' in order to parse out
37-
// the host, port, username, password, etc.
38-
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
39-
// trying to keep the # of environment variables from proliferating
40-
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
41-
const { hostname, port, username, password, pathname } = url;
42-
return {
43-
connection: {
44-
host: hostname || undefined,
45-
port: port ? Number(port) : undefined,
46-
username: username || undefined,
47-
password: password || undefined,
48-
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
49-
},
50-
};
51-
},
52-
inject: [ConfigService],
53-
}),
30+
QueueModule,
5431
EventEmitterModule.forRoot({
5532
// Use this instance throughout the application
5633
global: true,
@@ -69,45 +46,6 @@ import { MILLISECONDS_PER_SECOND } from 'time-constants';
6946
// disable throwing uncaughtException if an error event is emitted and it has no listeners
7047
ignoreErrors: false,
7148
}),
72-
BullModule.registerQueue(
73-
{
74-
name: QueueConstants.GRAPH_CHANGE_REQUEST_QUEUE,
75-
defaultJobOptions: {
76-
removeOnComplete: false,
77-
removeOnFail: false,
78-
attempts: 3,
79-
},
80-
},
81-
{
82-
name: QueueConstants.GRAPH_CHANGE_PUBLISH_QUEUE,
83-
defaultJobOptions: {
84-
removeOnComplete: true,
85-
removeOnFail: false,
86-
attempts: 1,
87-
},
88-
},
89-
{
90-
name: QueueConstants.GRAPH_CHANGE_NOTIFY_QUEUE,
91-
defaultJobOptions: {
92-
removeOnComplete: true,
93-
removeOnFail: false,
94-
attempts: 10,
95-
delay: SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND,
96-
backoff: {
97-
type: 'fixed',
98-
delay: SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND,
99-
},
100-
},
101-
},
102-
{
103-
name: QueueConstants.RECONNECT_REQUEST_QUEUE,
104-
defaultJobOptions: {
105-
removeOnComplete: false,
106-
removeOnFail: false,
107-
attempts: 3,
108-
},
109-
},
110-
),
11149
// Bullboard UI
11250
BullBoardModule.forRoot({
11351
route: '/queues',

services/graph/apps/api/src/api.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import Redis from 'ioredis';
44
import { InjectQueue } from '@nestjs/bullmq';
55
import { Queue } from 'bullmq';
66
import { createHash } from 'crypto';
7-
import * as QueueConstants from '#lib/utils/queues';
7+
import * as QueueConstants from '#lib/queues/queue-constants';
88
import * as RedisConstants from '#lib/utils/redis';
99
import {
1010
AsyncDebouncerService,
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import { BlockchainModule, BlockchainService, ConfigModule, ConfigService, GraphStateManager } from '#lib';
2-
import { BullModule } from '@nestjs/bullmq';
32
import { Module } from '@nestjs/common';
43
import { RedisModule } from '@songkeys/nestjs-redis';
54
import { GraphNotifierService } from './graph.monitor.processor.service';
6-
import * as QueueConstants from '#lib/utils/queues';
75

86
@Module({
97
imports: [
108
BlockchainModule,
11-
ConfigModule,
129
RedisModule.forRootAsync(
1310
{
1411
imports: [ConfigModule],
@@ -19,55 +16,8 @@ import * as QueueConstants from '#lib/utils/queues';
1916
},
2017
true, // isGlobal
2118
),
22-
BullModule.forRootAsync({
23-
imports: [ConfigModule],
24-
useFactory: (configService: ConfigService) => {
25-
// Note: BullMQ doesn't honor a URL for the Redis connection, and
26-
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
27-
// it by changing the URL to use 'http://' in order to parse out
28-
// the host, port, username, password, etc.
29-
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
30-
// trying to keep the # of environment variables from proliferating
31-
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
32-
const { hostname, port, username, password, pathname } = url;
33-
return {
34-
connection: {
35-
host: hostname || undefined,
36-
port: port ? Number(port) : undefined,
37-
username: username || undefined,
38-
password: password || undefined,
39-
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
40-
},
41-
};
42-
},
43-
inject: [ConfigService],
44-
}),
45-
BullModule.registerQueue(
46-
{
47-
name: QueueConstants.GRAPH_CHANGE_NOTIFY_QUEUE,
48-
defaultJobOptions: {
49-
removeOnComplete: true,
50-
removeOnFail: false,
51-
attempts: 3,
52-
backoff: {
53-
type: 'exponential',
54-
},
55-
},
56-
},
57-
{
58-
name: QueueConstants.GRAPH_CHANGE_REQUEST_QUEUE,
59-
defaultJobOptions: {
60-
removeOnComplete: true,
61-
removeOnFail: false,
62-
attempts: 3,
63-
},
64-
},
65-
{
66-
name: QueueConstants.RECONNECT_REQUEST_QUEUE,
67-
},
68-
),
6919
],
70-
providers: [GraphNotifierService, GraphStateManager, BlockchainService, ConfigService],
71-
exports: [BullModule, GraphNotifierService, BlockchainService, ConfigService],
20+
providers: [GraphNotifierService, GraphStateManager],
21+
exports: [GraphNotifierService],
7222
})
7323
export class GraphNotifierModule {}

services/graph/apps/worker/src/graph_notifier/graph.monitor.processor.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { MILLISECONDS_PER_SECOND } from 'time-constants';
77
import { RegistryError } from '@polkadot/types/types';
88
import axios from 'axios';
99
import { BaseConsumer, AsyncDebouncerService, BlockchainService, GraphStateManager, ITxMonitorJob, ProviderGraphUpdateJob, SECONDS_PER_BLOCK, ConfigService } from '#lib';
10-
import * as QueueConstants from '#lib/utils/queues';
10+
import * as QueueConstants from '#lib/queues/queue-constants';
1111
import * as RedisConstants from '#lib/utils/redis';
1212
import * as BlockchainConstants from '#lib/blockchain/blockchain-constants';
1313
import * as GraphServiceWebhook from '#lib/types/webhook-types';
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,12 @@
1-
/*
2-
https://docs.nestjs.com/modules
3-
*/
4-
5-
import { BullModule } from '@nestjs/bullmq';
61
import { Module } from '@nestjs/common';
7-
import { RedisModule } from '@songkeys/nestjs-redis';
8-
import * as QueueConstants from '#lib/utils/queues';
9-
import { BlockchainModule, ConfigModule, ConfigService, NonceService } from '#lib';
2+
import { BlockchainModule, NonceService } from '#lib';
103
import { GraphUpdatePublisherService } from './graph.publisher.processor.service';
114

125
@Module({
136
imports: [
147
BlockchainModule,
15-
ConfigModule,
16-
RedisModule.forRootAsync(
17-
{
18-
imports: [ConfigModule],
19-
useFactory: (configService: ConfigService) => ({
20-
config: [{ url: configService.redisUrl.toString() }],
21-
}),
22-
inject: [ConfigService],
23-
},
24-
true, // isGlobal
25-
),
26-
BullModule.forRootAsync({
27-
imports: [ConfigModule],
28-
useFactory: (configService: ConfigService) => {
29-
// Note: BullMQ doesn't honor a URL for the Redis connection, and
30-
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
31-
// it by changing the URL to use 'http://' in order to parse out
32-
// the host, port, username, password, etc.
33-
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
34-
// trying to keep the # of environment variables from proliferating
35-
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
36-
const { hostname, port, username, password, pathname } = url;
37-
return {
38-
connection: {
39-
host: hostname || undefined,
40-
port: port ? Number(port) : undefined,
41-
username: username || undefined,
42-
password: password || undefined,
43-
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
44-
},
45-
};
46-
},
47-
inject: [ConfigService],
48-
}),
49-
BullModule.registerQueue(
50-
{
51-
name: QueueConstants.GRAPH_CHANGE_NOTIFY_QUEUE,
52-
defaultJobOptions: {
53-
removeOnComplete: true,
54-
removeOnFail: false,
55-
},
56-
},
57-
{
58-
name: QueueConstants.GRAPH_CHANGE_PUBLISH_QUEUE,
59-
defaultJobOptions: {
60-
removeOnComplete: true,
61-
removeOnFail: false,
62-
},
63-
},
64-
),
658
],
669
providers: [GraphUpdatePublisherService, NonceService],
67-
exports: [BullModule, GraphUpdatePublisherService],
10+
exports: [GraphUpdatePublisherService, NonceService],
6811
})
6912
export class GraphUpdatePublisherModule {}

services/graph/apps/worker/src/graph_publisher/graph.publisher.processor.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { SubmittableExtrinsic } from '@polkadot/api-base/types';
99
import { ISubmittableResult } from '@polkadot/types/types';
1010
import { MILLISECONDS_PER_SECOND } from 'time-constants';
1111
import { SchedulerRegistry } from '@nestjs/schedule';
12-
import * as QueueConstants from '#lib/utils/queues';
12+
import * as QueueConstants from '#lib/queues/queue-constants';
1313
import { BaseConsumer, BlockchainService, ConfigService, GraphUpdateJob, ITxMonitorJob, NonceService, createKeys } from '#lib';
1414

1515
export const SECONDS_PER_BLOCK = 12;
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,12 @@
1-
import { BullModule } from '@nestjs/bullmq';
21
import { Module } from '@nestjs/common';
3-
import { RedisModule } from '@songkeys/nestjs-redis';
42
import { EventEmitterModule } from '@nestjs/event-emitter';
5-
import * as QueueConstants from '#lib/utils/queues';
6-
import { ConfigModule, ConfigService } from '#lib';
73
import { GraphReconnectionService } from './graph.reconnection.processor.service';
84

95
@Module({
106
imports: [
11-
ConfigModule,
127
EventEmitterModule,
13-
RedisModule.forRootAsync(
14-
{
15-
imports: [ConfigModule],
16-
useFactory: (configService: ConfigService) => ({
17-
config: [{ url: configService.redisUrl.toString() }],
18-
}),
19-
inject: [ConfigService],
20-
},
21-
true, // isGlobal
22-
),
23-
BullModule.forRootAsync({
24-
imports: [ConfigModule],
25-
useFactory: (configService: ConfigService) => {
26-
// Note: BullMQ doesn't honor a URL for the Redis connection, and
27-
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
28-
// it by changing the URL to use 'http://' in order to parse out
29-
// the host, port, username, password, etc.
30-
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
31-
// trying to keep the # of environment variables from proliferating
32-
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
33-
const { hostname, port, username, password, pathname } = url;
34-
return {
35-
connection: {
36-
host: hostname || undefined,
37-
port: port ? Number(port) : undefined,
38-
username: username || undefined,
39-
password: password || undefined,
40-
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
41-
},
42-
};
43-
},
44-
inject: [ConfigService],
45-
}),
46-
BullModule.registerQueue(
47-
{
48-
name: QueueConstants.RECONNECT_REQUEST_QUEUE,
49-
defaultJobOptions: {
50-
removeOnComplete: false,
51-
removeOnFail: false,
52-
attempts: 3,
53-
},
54-
},
55-
{
56-
name: QueueConstants.GRAPH_CHANGE_REQUEST_QUEUE,
57-
defaultJobOptions: {
58-
removeOnComplete: false,
59-
removeOnFail: false,
60-
attempts: 3,
61-
},
62-
},
63-
),
648
],
659
providers: [GraphReconnectionService],
66-
exports: [BullModule, GraphReconnectionService],
10+
exports: [GraphReconnectionService],
6711
})
6812
export class GraphReconnectionModule {}

services/graph/apps/worker/src/reconnection_processor/graph.reconnection.processor.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Job, Queue } from 'bullmq';
44
import { EventEmitter2, OnEvent } from '@nestjs/event-emitter';
55
import { MessageSourceId, ProviderId } from '@frequency-chain/api-augment/interfaces';
66
import { AxiosError, AxiosResponse } from 'axios';
7-
import * as QueueConstants from '#lib/utils/queues';
7+
import * as QueueConstants from '#lib/queues/queue-constants';
88
import { BaseConsumer, ConfigService, ConnectionDto, GraphKeyPairDto, IGraphUpdateJob, ProviderGraphUpdateJob, ProviderWebhookService } from '#lib';
99

1010
@Injectable()

0 commit comments

Comments
 (0)