@@ -34,10 +34,10 @@ import * as cluster from "cluster";
34
34
import { Worker } from "cluster" ;
35
35
import { HyperionWorkerDef } from "../interfaces/hyperionWorkerDef" ;
36
36
import { HyperionConfig } from "../interfaces/hyperionConfig" ;
37
- import moment = require( "moment" ) ;
38
- import Timeout = NodeJS . Timeout ;
39
37
40
38
import { AsyncQueue , queue } from "async" ;
39
+ import moment = require( "moment" ) ;
40
+ import Timeout = NodeJS . Timeout ;
41
41
42
42
export class HyperionMaster {
43
43
@@ -86,7 +86,6 @@ export class HyperionMaster {
86
86
private lastProducer : string = null ;
87
87
private handoffCounter : number = 0 ;
88
88
private missedRounds : object = { } ;
89
- private blockMsgQueue : any [ ] = [ ] ;
90
89
91
90
// IPC Messaging
92
91
private totalMessages = 0 ;
@@ -134,6 +133,7 @@ export class HyperionMaster {
134
133
private proposedSchedule : any ;
135
134
private wsRouterWorker : cluster . Worker ;
136
135
private liveBlockQueue : AsyncQueue < any > ;
136
+ private readingPaused = false ;
137
137
138
138
139
139
constructor ( ) {
@@ -1140,17 +1140,65 @@ export class HyperionMaster {
1140
1140
} , 5000 ) ;
1141
1141
}
1142
1142
1143
- private monitorIndexingQueues ( ) {
1144
- const limit = this . conf . scaling . auto_scale_trigger ;
1145
- const autoscaleConsumers = { } ;
1146
- setInterval ( async ( ) => {
1147
- const testedQueues = new Set ( ) ;
1148
- for ( const worker of this . workerMap ) {
1149
- if ( worker . worker_role === 'ingestor' ) {
1150
- const queue = worker . queue ;
1151
- if ( ! testedQueues . has ( queue ) ) {
1152
- testedQueues . add ( queue ) ;
1153
- const size = await this . manager . checkQueueSize ( queue ) ;
1143
+ private async checkQueues ( autoscaleConsumers , limit ) {
1144
+ const testedQueues = new Set ( ) ;
1145
+ for ( const worker of this . workerMap ) {
1146
+ let queue = worker . queue ;
1147
+
1148
+ if ( worker . worker_role === 'ds_pool_worker' ) {
1149
+ queue = `${ this . chain } :ds_pool:${ worker . local_id } ` ;
1150
+ }
1151
+
1152
+ if ( queue ) {
1153
+ if ( ! testedQueues . has ( queue ) ) {
1154
+ testedQueues . add ( queue ) ;
1155
+ const size = await this . manager . checkQueueSize ( queue ) ;
1156
+
1157
+
1158
+ // pause readers if queues are above the max_limit
1159
+ if ( size >= this . conf . scaling . max_queue_limit ) {
1160
+ this . readingPaused = true ;
1161
+ for ( const worker of this . workerMap ) {
1162
+ if ( worker . worker_role === 'reader' ) {
1163
+ worker . wref . send ( { event : 'pause' } ) ;
1164
+ }
1165
+ }
1166
+ }
1167
+
1168
+ // resume readers if the queues are below the trigger point
1169
+ if ( ( this . readingPaused && size <= this . conf . scaling . resume_trigger ) ) {
1170
+ this . readingPaused = false ;
1171
+ for ( const worker of this . workerMap ) {
1172
+ if ( worker . worker_role === 'reader' ) {
1173
+ worker . wref . send ( { event : 'pause' } ) ;
1174
+ worker . wref . send ( {
1175
+ event : 'set_delay' ,
1176
+ data : {
1177
+ state : false ,
1178
+ delay : 0
1179
+ }
1180
+ } ) ;
1181
+ }
1182
+ }
1183
+ }
1184
+
1185
+ // apply block processing delay if 20% below max
1186
+ if ( size >= this . conf . scaling . max_queue_limit * 0.8 ) {
1187
+ for ( const worker of this . workerMap ) {
1188
+ if ( worker . worker_role === 'reader' ) {
1189
+ worker . wref . send ( {
1190
+ event : 'set_delay' ,
1191
+ data : {
1192
+ state : true ,
1193
+ delay : 500
1194
+ }
1195
+ } ) ;
1196
+ }
1197
+ }
1198
+ }
1199
+
1200
+
1201
+ if ( worker . worker_role === 'ingestor' ) {
1154
1202
if ( size > limit ) {
1155
1203
if ( ! autoscaleConsumers [ queue ] ) {
1156
1204
autoscaleConsumers [ queue ] = 0 ;
@@ -1164,14 +1212,24 @@ export class HyperionMaster {
1164
1212
} ) ;
1165
1213
this . launchWorkers ( ) ;
1166
1214
autoscaleConsumers [ queue ] ++ ;
1167
- } else {
1168
- // hLog(`WARN: Max consumer limit reached on ${queue}!`);
1169
1215
}
1170
1216
}
1171
1217
}
1172
1218
}
1173
1219
}
1174
- } , 20000 ) ;
1220
+ }
1221
+ }
1222
+
1223
+ private monitorIndexingQueues ( ) {
1224
+ const limit = this . conf . scaling . auto_scale_trigger ;
1225
+ const autoscaleConsumers = { } ;
1226
+ this . checkQueues ( autoscaleConsumers , limit ) . catch ( console . log ) ;
1227
+ if ( ! this . conf . scaling . polling_interval ) {
1228
+ this . conf . scaling . polling_interval = 20000 ;
1229
+ }
1230
+ setInterval ( async ( ) => {
1231
+ await this . checkQueues ( autoscaleConsumers , limit ) ;
1232
+ } , this . conf . scaling . polling_interval ) ;
1175
1233
}
1176
1234
1177
1235
private onPm2Stop ( ) {
@@ -1350,6 +1408,22 @@ export class HyperionMaster {
1350
1408
1351
1409
async runMaster ( ) {
1352
1410
1411
+ // config checks
1412
+ if ( ! this . conf . scaling . max_queue_limit ) {
1413
+ hLog ( `scaling.max_queue_limit is not defined!` ) ;
1414
+ process . exit ( 1 ) ;
1415
+ }
1416
+
1417
+ if ( ! this . conf . scaling . resume_trigger ) {
1418
+ hLog ( `scaling.resume_trigger is not defined!` ) ;
1419
+ process . exit ( 1 ) ;
1420
+ }
1421
+
1422
+ if ( ! this . conf . scaling . block_queue_limit ) {
1423
+ hLog ( `scaling.block_queue_limit is not defined!` ) ;
1424
+ process . exit ( 1 ) ;
1425
+ }
1426
+
1353
1427
this . printMode ( ) ;
1354
1428
1355
1429
// Preview mode - prints only the proposed worker map
@@ -1365,7 +1439,7 @@ export class HyperionMaster {
1365
1439
this . printActiveProds ( ) ;
1366
1440
}
1367
1441
1368
- // ELasticsearch
1442
+ // Elasticsearch
1369
1443
this . client = this . manager . elasticsearchClient ;
1370
1444
try {
1371
1445
const esInfo = await this . client . info ( ) ;
0 commit comments