@@ -8,6 +8,7 @@ import {Type} from "../addons/eosjs-native/eosjs-serialize";
8
8
import { debugLog , hLog } from "../helpers/common_functions" ;
9
9
import { createHash } from "crypto" ;
10
10
import flatstr from 'flatstr' ;
11
+ import { Options } from "amqplib" ;
11
12
12
13
const FJS = require ( 'fast-json-stringify' ) ;
13
14
@@ -124,6 +125,9 @@ export default class MainDSWorker extends HyperionWorker {
124
125
125
126
allowedDynamicContracts : Set < string > = new Set < string > ( ) ;
126
127
128
+ backpressureQueue : any [ ] = [ ] ;
129
+ waitToSend = false ;
130
+
127
131
constructor ( ) {
128
132
129
133
super ( ) ;
@@ -290,6 +294,16 @@ export default class MainDSWorker extends HyperionWorker {
290
294
this . ch . consume ( process . env [ 'worker_queue' ] , ( data ) => {
291
295
this . consumerQueue . push ( data ) . catch ( console . log ) ;
292
296
} ) ;
297
+ this . ch . on ( 'drain' , args => {
298
+ this . waitToSend = false ;
299
+ while ( this . backpressureQueue . length > 0 ) {
300
+ const msg = this . backpressureQueue . shift ( ) ;
301
+ const status = this . controlledSendToQueue ( msg . queue , msg . payload , msg . options ) ;
302
+ if ( ! status ) {
303
+ break ;
304
+ }
305
+ }
306
+ } ) ;
293
307
}
294
308
}
295
309
@@ -478,15 +492,15 @@ export default class MainDSWorker extends HyperionWorker {
478
492
hLog ( `${ block_num } was filtered with ${ inline_count } actions!` ) ;
479
493
}
480
494
try {
495
+ trace [ 1 ] . signatures = signatures ;
481
496
this . routeToPool ( trace [ 1 ] , {
482
497
block_num,
483
498
block_id,
484
499
producer,
485
500
ts,
486
501
inline_count,
487
502
filtered,
488
- live : process . env [ 'live_mode' ] ,
489
- signatures
503
+ live : process . env [ 'live_mode' ]
490
504
} ) ;
491
505
} catch ( e ) {
492
506
hLog ( e ) ;
@@ -622,15 +636,33 @@ export default class MainDSWorker extends HyperionWorker {
622
636
}
623
637
624
638
const pool_queue = `${ this . chain } :ds_pool:${ selected_q } ` ;
625
- if ( this . ch_ready ) {
626
- // console.log('selected_q', pool_queue);
627
- this . ch . sendToQueue ( pool_queue , bufferFromJson ( trace , true ) , { headers} ) ;
628
- return true ;
639
+ const payload = bufferFromJson ( trace , true ) ;
640
+
641
+ if ( ! this . waitToSend ) {
642
+ if ( this . ch_ready ) {
643
+ this . controlledSendToQueue ( pool_queue , payload , { headers} ) ;
644
+ return true ;
645
+ } else {
646
+ return false ;
647
+ }
629
648
} else {
649
+ this . backpressureQueue . push ( {
650
+ queue : pool_queue ,
651
+ payload : payload ,
652
+ options : { headers}
653
+ } ) ;
630
654
return false ;
631
655
}
632
656
}
633
657
658
+ controlledSendToQueue ( pool_queue : string , payload : Buffer , options : Options . Publish ) : boolean {
659
+ const enqueueResult = this . ch . sendToQueue ( pool_queue , payload , options ) ;
660
+ if ( ! enqueueResult ) {
661
+ this . waitToSend = true ;
662
+ }
663
+ return enqueueResult ;
664
+ }
665
+
634
666
createSerialBuffer ( inputArray ) {
635
667
return new Serialize . SerialBuffer ( { textEncoder : this . txEnc , textDecoder : this . txDec , array : inputArray } ) ;
636
668
}
@@ -1160,7 +1192,7 @@ export default class MainDSWorker extends HyperionWorker {
1160
1192
let jsonRow = await this . processContractRowNative ( payload , block_num ) ;
1161
1193
1162
1194
if ( jsonRow ?. value && ! jsonRow [ '_blacklisted' ] ) {
1163
- console . log ( jsonRow ) ;
1195
+ debugLog ( jsonRow ) ;
1164
1196
debugLog ( 'Delta DS failed ->>' , jsonRow ) ;
1165
1197
jsonRow = await this . processContractRowNative ( payload , block_num - 1 ) ;
1166
1198
debugLog ( 'Retry with previous ABI ->>' , jsonRow ) ;
@@ -1495,7 +1527,7 @@ export default class MainDSWorker extends HyperionWorker {
1495
1527
} catch ( e ) {
1496
1528
hLog ( `Delta struct [${ key } ] processing error: ${ e . message } ` ) ;
1497
1529
hLog ( e ) ;
1498
- console . log ( data [ 1 ] ) ;
1530
+ hLog ( data [ 1 ] ) ;
1499
1531
}
1500
1532
}
1501
1533
}
0 commit comments