Skip to content

Commit 86c00f6

Browse files
author
Igor Lins e Silva
committed
[indexer] flow control limiter fixed
fixes an issue where the state reader would get stuck on a artificial flow rate
1 parent 3a3e6ac commit 86c00f6

File tree

2 files changed

+57
-53
lines changed

2 files changed

+57
-53
lines changed

modules/master.ts

+51-44
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ export class HyperionMaster {
135135
private wsRouterWorker: cluster.Worker;
136136
private liveBlockQueue: AsyncQueue<any>;
137137
private readingPaused = false;
138+
private readingLimited = false;
138139

139140
// Hyperion Hub Socket
140141
private hub: SocketIOClient.Socket;
@@ -1178,74 +1179,80 @@ export class HyperionMaster {
11781179
queue = `${this.chain}:ds_pool:${worker.local_id}`;
11791180
}
11801181

1181-
if (queue) {
1182-
if (!testedQueues.has(queue)) {
1183-
testedQueues.add(queue);
1182+
if (queue && !testedQueues.has(queue)) {
11841183

1185-
const size = await this.manager.checkQueueSize(queue);
1184+
const size = await this.manager.checkQueueSize(queue);
11861185

1186+
// pause readers if queues are above the max_limit
1187+
if (size >= this.conf.scaling.max_queue_limit) {
1188+
this.readingPaused = true;
1189+
for (const worker of this.workerMap) {
1190+
if (worker.worker_role === 'reader') {
1191+
worker.wref.send({event: 'pause'});
1192+
}
1193+
}
1194+
}
1195+
1196+
// resume readers if the queues are below the trigger point
1197+
if (size <= this.conf.scaling.resume_trigger) {
11871198

1188-
// pause readers if queues are above the max_limit
1189-
if (size >= this.conf.scaling.max_queue_limit) {
1190-
this.readingPaused = true;
1199+
// remove flow limiter
1200+
if (this.readingLimited) {
1201+
this.readingLimited = false;
11911202
for (const worker of this.workerMap) {
11921203
if (worker.worker_role === 'reader') {
1193-
worker.wref.send({event: 'pause'});
1204+
worker.wref.send({event: 'set_delay', data: {state: false, delay: 0}});
11941205
}
11951206
}
11961207
}
11971208

1198-
// resume readers if the queues are below the trigger point
1199-
if ((this.readingPaused && size <= this.conf.scaling.resume_trigger)) {
1209+
// fully unpause
1210+
if (this.readingPaused) {
12001211
this.readingPaused = false;
12011212
for (const worker of this.workerMap) {
12021213
if (worker.worker_role === 'reader') {
12031214
worker.wref.send({event: 'pause'});
1204-
worker.wref.send({
1205-
event: 'set_delay',
1206-
data: {
1207-
state: false,
1208-
delay: 0
1209-
}
1210-
});
12111215
}
12121216
}
12131217
}
1218+
}
12141219

1215-
// apply block processing delay if 20% below max
1216-
if (size >= this.conf.scaling.max_queue_limit * 0.8) {
1217-
for (const worker of this.workerMap) {
1218-
if (worker.worker_role === 'reader') {
1219-
worker.wref.send({
1220-
event: 'set_delay',
1221-
data: {
1222-
state: true,
1223-
delay: 250
1224-
}
1225-
});
1226-
}
1220+
// apply block processing delay on 80% usage
1221+
if (size >= this.conf.scaling.max_queue_limit * 0.8) {
1222+
this.readingLimited = true;
1223+
for (const worker of this.workerMap) {
1224+
if (worker.worker_role === 'reader') {
1225+
worker.wref.send({
1226+
event: 'set_delay',
1227+
data: {
1228+
state: true,
1229+
delay: 250
1230+
}
1231+
});
12271232
}
12281233
}
1234+
}
12291235

12301236

1231-
if (worker.worker_role === 'ingestor') {
1232-
if (size > limit) {
1233-
if (!autoscaleConsumers[queue]) {
1234-
autoscaleConsumers[queue] = 0;
1235-
}
1236-
if (autoscaleConsumers[queue] < this.conf.scaling.max_autoscale) {
1237-
hLog(`${queue} is above the limit (${size}/${limit}). Launching consumer...`);
1238-
this.addWorker({
1239-
queue: queue,
1240-
type: worker.type,
1241-
worker_role: 'ingestor'
1242-
});
1243-
this.launchWorkers();
1244-
autoscaleConsumers[queue]++;
1245-
}
1237+
if (worker.worker_role === 'ingestor') {
1238+
if (size > limit) {
1239+
if (!autoscaleConsumers[queue]) {
1240+
autoscaleConsumers[queue] = 0;
1241+
}
1242+
if (autoscaleConsumers[queue] < this.conf.scaling.max_autoscale) {
1243+
hLog(`${queue} is above the limit (${size}/${limit}). Launching consumer...`);
1244+
this.addWorker({
1245+
queue: queue,
1246+
type: worker.type,
1247+
worker_role: 'ingestor'
1248+
});
1249+
this.launchWorkers();
1250+
autoscaleConsumers[queue]++;
12461251
}
12471252
}
12481253
}
1254+
1255+
testedQueues.add(queue);
12491256
}
12501257
}
12511258
}

workers/state-reader.ts

+6-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export default class StateReader extends HyperionWorker {
3131
private currentIdx = 1;
3232
private receivedFirstBlock = false;
3333
private local_lib = 0;
34-
private delay_block_processing = false;
34+
private delay_active = false;
3535
private block_processing_delay = 100;
3636

3737
// private tempBlockSizeSum = 0;
@@ -49,13 +49,7 @@ export default class StateReader extends HyperionWorker {
4949

5050
this.blockReadingQueue = cargo((tasks, next) => {
5151
this.processIncomingBlocks(tasks).then(() => {
52-
if (this.delay_block_processing) {
53-
setTimeout(() => {
54-
next();
55-
}, this.block_processing_delay);
56-
} else {
57-
next();
58-
}
52+
next();
5953
}).catch((err) => {
6054
console.log('FATAL ERROR READING BLOCKS', err);
6155
process.exit(1);
@@ -213,7 +207,7 @@ export default class StateReader extends HyperionWorker {
213207
break;
214208
}
215209
case 'set_delay': {
216-
this.delay_block_processing = msg.data.state;
210+
this.delay_active = msg.data.state;
217211
this.block_processing_delay = msg.data.delay;
218212
break;
219213
}
@@ -282,6 +276,9 @@ export default class StateReader extends HyperionWorker {
282276
} else {
283277
await this.onMessage(block_array[0]);
284278
}
279+
if (this.delay_active) {
280+
await new Promise(resolve => setTimeout(resolve, this.block_processing_delay));
281+
}
285282
return true;
286283
}
287284

0 commit comments

Comments
 (0)