@@ -8,7 +8,7 @@ import { SubmittableExtrinsic } from '@polkadot/api-base/types';
8
8
import { Codec , ISubmittableResult } from '@polkadot/types/types' ;
9
9
import { MILLISECONDS_PER_SECOND } from 'time-constants' ;
10
10
import { SchedulerRegistry } from '@nestjs/schedule' ;
11
- import { BlockchainService } from '#lib/blockchain/blockchain.service' ;
11
+ import { BlockchainService , ICapacityInfo } from '#lib/blockchain/blockchain.service' ;
12
12
import { createKeys } from '#lib/blockchain/create-keys' ;
13
13
import { NonceService } from '#lib/services/nonce.service' ;
14
14
import { TransactionType } from '#lib/types/enums' ;
@@ -18,6 +18,12 @@ import { RedisUtils, TransactionData } from 'libs/common/src';
18
18
import { ConfigService } from '#lib/config/config.service' ;
19
19
import { ITxStatus } from '#lib/interfaces/tx-status.interface' ;
20
20
import { HexString } from '@polkadot/util/types' ;
21
+ import {
22
+ CAPACITY_AVAILABLE_EVENT ,
23
+ CAPACITY_EXHAUSTED_EVENT ,
24
+ CapacityCheckerService ,
25
+ } from '#lib/blockchain/capacity-checker.service' ;
26
+ import { OnEvent } from '@nestjs/event-emitter' ;
21
27
22
28
export const SECONDS_PER_BLOCK = 12 ;
23
29
const CAPACITY_EPOCH_TIMEOUT_NAME = 'capacity_check' ;
@@ -29,7 +35,7 @@ const CAPACITY_EPOCH_TIMEOUT_NAME = 'capacity_check';
29
35
@Processor ( QueueConstants . TRANSACTION_PUBLISH_QUEUE )
30
36
export class TransactionPublisherService extends BaseConsumer implements OnApplicationShutdown {
31
37
public async onApplicationBootstrap ( ) {
32
- await this . checkCapacity ( ) ;
38
+ await this . capacityCheckerService . checkForSufficientCapacity ( ) ;
33
39
}
34
40
35
41
public async onApplicationShutdown ( _signal ?: string | undefined ) : Promise < void > {
@@ -48,6 +54,7 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
48
54
private blockchainService : BlockchainService ,
49
55
private nonceService : NonceService ,
50
56
private schedulerRegistry : SchedulerRegistry ,
57
+ private capacityCheckerService : CapacityCheckerService ,
51
58
) {
52
59
super ( ) ;
53
60
}
@@ -61,7 +68,7 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
61
68
let txHash : HexString ;
62
69
try {
63
70
// Check capacity first; if out of capacity, send job back to queue
64
- if ( ! ( await this . checkCapacity ( ) ) ) {
71
+ if ( ! ( await this . capacityCheckerService . checkForSufficientCapacity ( ) ) ) {
65
72
job . moveToDelayed ( Date . now ( ) , job . token ) ; // fake delay, we just want to avoid processing the current job if we're out of capacity
66
73
throw new DelayedError ( ) ;
67
74
}
@@ -114,11 +121,10 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
114
121
obj [ txHash ] = JSON . stringify ( status ) ;
115
122
this . cacheManager . hset ( RedisUtils . TXN_WATCH_LIST_KEY , obj ) ;
116
123
} catch ( error : any ) {
117
- if ( error instanceof DelayedError ) {
118
- throw error ;
124
+ if ( ! ( error instanceof DelayedError ) ) {
125
+ this . logger . error ( 'Unknown error encountered: ' , error , error ?. stack ) ;
119
126
}
120
127
121
- this . logger . error ( 'Unknown error encountered: ' , error , error ?. stack ) ;
122
128
throw error ;
123
129
}
124
130
}
@@ -183,89 +189,59 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
183
189
}
184
190
}
185
191
186
- /**
187
- * Checks the capacity of the account publisher and takes appropriate actions based on the capacity status.
188
- * If the capacity is exhausted, it pauses the account change publish queue and sets a timeout to check the capacity again.
189
- * If the capacity is refilled, it resumes the account change publish queue and clears the timeout.
190
- * If any jobs failed due to low balance/no capacity, it retries them.
191
- * If any error occurs during the capacity check, it logs the error.
192
- */
193
- private async checkCapacity ( ) : Promise < boolean > {
194
- let outOfCapacity = false ;
192
+ @OnEvent ( CAPACITY_EXHAUSTED_EVENT )
193
+ public async handleCapacityExhausted ( capacityInfo : ICapacityInfo ) {
194
+ await this . transactionPublishQueue . pause ( ) ;
195
+ const blocksRemaining = capacityInfo . nextEpochStart - capacityInfo . currentBlockNumber ;
196
+ const epochTimeout = blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND ;
197
+ // Avoid spamming the log
198
+ if ( ! ( await this . transactionPublishQueue . isPaused ( ) ) ) {
199
+ this . logger . warn (
200
+ `Capacity Exhausted: Pausing account change publish queue until next epoch: ${ epochTimeout / 1000 } seconds` ,
201
+ ) ;
202
+ }
195
203
try {
196
- const { capacityLimit } = this . configService ;
197
- const capacityInfo = await this . blockchainService . capacityInfo ( this . configService . providerId ) ;
198
- const { remainingCapacity } = capacityInfo ;
199
- const { currentEpoch } = capacityInfo ;
200
- const epochCapacityKey = `epochCapacity:${ currentEpoch } ` ;
201
- const epochUsedCapacity = BigInt ( ( await this . cacheManager . get ( epochCapacityKey ) ) ?? 0 ) ; // Fetch capacity used by the service
202
- outOfCapacity = remainingCapacity <= 0n ;
203
-
204
- if ( ! outOfCapacity ) {
205
- this . logger . debug ( ` Capacity remaining: ${ remainingCapacity } ` ) ;
206
- if ( capacityLimit . type === 'percentage' ) {
207
- const capacityLimitPercentage = BigInt ( capacityLimit . value ) ;
208
- const capacityLimitThreshold = ( capacityInfo . totalCapacityIssued * capacityLimitPercentage ) / 100n ;
209
- this . logger . debug ( `Capacity limit threshold: ${ capacityLimitThreshold } ` ) ;
210
- if ( epochUsedCapacity >= capacityLimitThreshold ) {
211
- outOfCapacity = true ;
212
- this . logger . warn ( `Capacity threshold reached: used ${ epochUsedCapacity } of ${ capacityLimitThreshold } ` ) ;
213
- }
214
- } else if ( epochUsedCapacity >= capacityLimit . value ) {
215
- outOfCapacity = true ;
216
- this . logger . warn ( `Capacity threshold reached: used ${ epochUsedCapacity } of ${ capacityLimit . value } ` ) ;
217
- }
204
+ // Check if a timeout with the same name already exists
205
+ if ( this . schedulerRegistry . doesExist ( 'timeout' , CAPACITY_EPOCH_TIMEOUT_NAME ) ) {
206
+ // If it does, delete it
207
+ this . schedulerRegistry . deleteTimeout ( CAPACITY_EPOCH_TIMEOUT_NAME ) ;
218
208
}
219
209
220
- if ( outOfCapacity ) {
221
- await this . transactionPublishQueue . pause ( ) ;
222
- const blocksRemaining = capacityInfo . nextEpochStart - capacityInfo . currentBlockNumber ;
223
- const epochTimeout = blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND ;
224
- this . logger . warn (
225
- `Capacity Exhausted: Pausing account change publish queue until next epoch: ${ epochTimeout / 1000 } seconds` ,
226
- ) ;
227
- try {
228
- // Check if a timeout with the same name already exists
229
- if ( this . schedulerRegistry . doesExist ( 'timeout' , CAPACITY_EPOCH_TIMEOUT_NAME ) ) {
230
- // If it does, delete it
231
- this . schedulerRegistry . deleteTimeout ( CAPACITY_EPOCH_TIMEOUT_NAME ) ;
232
- }
233
-
234
- // Add the new timeout
235
- this . schedulerRegistry . addTimeout (
236
- CAPACITY_EPOCH_TIMEOUT_NAME ,
237
- setTimeout ( ( ) => this . checkCapacity ( ) , epochTimeout ) ,
238
- ) ;
239
- } catch ( err ) {
240
- // Handle any errors
241
- console . error ( err ) ;
242
- }
243
- } else {
244
- this . logger . verbose ( 'Capacity Available: Resuming account change publish queue and clearing timeout' ) ;
245
- // Get the failed jobs and check if they failed due to capacity
246
- const failedJobs = await this . transactionPublishQueue . getFailed ( ) ;
247
- const capacityFailedJobs = failedJobs . filter ( ( job ) =>
248
- job . failedReason ?. includes ( '1010: Invalid Transaction: Inability to pay some fees' ) ,
249
- ) ;
250
- // Retry the failed jobs
251
- await Promise . all (
252
- capacityFailedJobs . map ( async ( job ) => {
253
- this . logger . debug ( `Retrying job ${ job . id } ` ) ;
254
- job . retry ( ) ;
255
- } ) ,
256
- ) ;
257
- try {
258
- this . schedulerRegistry . deleteTimeout ( CAPACITY_EPOCH_TIMEOUT_NAME ) ;
259
- } catch ( err ) {
260
- // ignore
261
- }
210
+ // Add the new timeout
211
+ this . schedulerRegistry . addTimeout (
212
+ CAPACITY_EPOCH_TIMEOUT_NAME ,
213
+ setTimeout ( ( ) => this . capacityCheckerService . checkForSufficientCapacity ( ) , epochTimeout ) ,
214
+ ) ;
215
+ } catch ( err ) {
216
+ // Handle any errors
217
+ console . error ( err ) ;
218
+ }
219
+ }
262
220
263
- await this . transactionPublishQueue . resume ( ) ;
264
- }
221
+ @OnEvent ( CAPACITY_AVAILABLE_EVENT )
222
+ public async handleCapacityAvailable ( ) {
223
+ // Avoid spamming the log
224
+ if ( await this . transactionPublishQueue . isPaused ( ) ) {
225
+ this . logger . verbose ( 'Capacity Available: Resuming account change publish queue and clearing timeout' ) ;
226
+ }
227
+ // Get the failed jobs and check if they failed due to capacity
228
+ const failedJobs = await this . transactionPublishQueue . getFailed ( ) ;
229
+ const capacityFailedJobs = failedJobs . filter ( ( job ) =>
230
+ job . failedReason ?. includes ( '1010: Invalid Transaction: Inability to pay some fees' ) ,
231
+ ) ;
232
+ // Retry the failed jobs
233
+ await Promise . all (
234
+ capacityFailedJobs . map ( async ( job ) => {
235
+ this . logger . debug ( `Retrying job ${ job . id } ` ) ;
236
+ job . retry ( ) ;
237
+ } ) ,
238
+ ) ;
239
+ try {
240
+ this . schedulerRegistry . deleteTimeout ( CAPACITY_EPOCH_TIMEOUT_NAME ) ;
265
241
} catch ( err ) {
266
- this . logger . error ( 'Caught error in checkCapacity' , err ) ;
242
+ // ignore
267
243
}
268
244
269
- return ! outOfCapacity ;
245
+ await this . transactionPublishQueue . resume ( ) ;
270
246
}
271
247
}
0 commit comments