@@ -4,7 +4,24 @@ const plugins = require('../../../plugins/pluginManager.js');
44const log = require ( '../../utils/log.js' ) ( "batcher" ) ;
55const common = require ( '../../utils/common.js' ) ;
66
7-
7+ var batcherStats = {
8+ key : 'BATCHER_STATS' ,
9+ pid : process . pid ,
10+ insert_queued : 0 ,
11+ insert_processing : 0 ,
12+ insert_errored_fallback : 0 ,
13+ insert_errored_no_fallback : 0 ,
14+ insert_errored_no_fallback_last_error : "" ,
15+ update_queued : 0 ,
16+ update_processing : 0 ,
17+ update_errored_fallback : 0 ,
18+ update_errored_no_fallback : 0 ,
19+ update_errored_no_fallback_last_error : "" ,
20+ } ;
21+
22+ setInterval ( function ( ) {
23+ log . i ( '%j' , batcherStats ) ;
24+ } , 10000 ) ;
825/**
926 * Class for batching database insert operations
1027 * @example
@@ -56,13 +73,16 @@ class InsertBatcher {
5673 if ( this . data [ db ] [ collection ] . length ) {
5774 var docs = this . data [ db ] [ collection ] ;
5875 this . data [ db ] [ collection ] = [ ] ;
76+ batcherStats . insert_queued -= docs . length ;
77+ batcherStats . insert_processing += docs . length ;
5978 try {
6079 await new Promise ( ( resolve , reject ) => {
6180 this . dbs [ db ] . collection ( collection ) . insertMany ( docs , { ordered : false , ignore_errors : [ 11000 ] } , function ( err , res ) {
6281 if ( err ) {
6382 reject ( err ) ;
6483 return ;
6584 }
85+ batcherStats . insert_processing -= docs . length ;
6686 resolve ( res ) ;
6787 } ) ;
6888 } ) ;
@@ -75,7 +95,10 @@ class InsertBatcher {
7595 //trying to rollback operations to try again on next iteration
7696 if ( ex . writeErrors && ex . writeErrors . length ) {
7797 for ( let i = 0 ; i < ex . writeErrors . length ; i ++ ) {
98+ batcherStats . insert_processing -- ;
7899 if ( no_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
100+ batcherStats . insert_errored_no_fallback ++ ;
101+ batcherStats . insert_errored_no_fallback_last_error = ex . writeErrors [ i ] . errmsg ;
79102 //dispatch failure
80103 if ( notify_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
81104 var index0 = ex . writeErrors [ i ] . index ;
@@ -85,6 +108,7 @@ class InsertBatcher {
85108 //we could record in diagnostic data
86109 continue ;
87110 }
111+ batcherStats . insert_errored_fallback ++ ;
88112 let index = ex . writeErrors [ i ] . index ;
89113 if ( docs [ index ] ) {
90114 this . data [ db ] [ collection ] . push ( docs [ index ] ) ;
@@ -136,8 +160,10 @@ class InsertBatcher {
136160 for ( let i = 0 ; i < doc . length ; i ++ ) {
137161 this . data [ db ] [ collection ] . push ( doc [ i ] ) ;
138162 }
163+ batcherStats . insert_queued += doc . length ;
139164 }
140165 else {
166+ batcherStats . insert_queued ++ ;
141167 this . data [ db ] [ collection ] . push ( doc ) ;
142168 }
143169 if ( ! this . process ) {
@@ -212,13 +238,16 @@ class WriteBatcher {
212238 }
213239 }
214240 this . data [ db ] [ collection ] = { } ;
241+ batcherStats . update_queued -= queries . length ;
242+ batcherStats . update_processing += queries . length ;
215243 try {
216244 await new Promise ( ( resolve , reject ) => {
217245 this . dbs [ db ] . collection ( collection ) . bulkWrite ( queries , { ordered : false , ignore_errors : [ 11000 ] } , function ( err , res ) {
218246 if ( err ) {
219247 reject ( err ) ;
220248 return ;
221249 }
250+ batcherStats . update_processing -= queries . length ;
222251 resolve ( res ) ;
223252 } ) ;
224253 } ) ;
@@ -231,8 +260,10 @@ class WriteBatcher {
231260 //trying to rollback operations to try again on next iteration
232261 if ( ex . writeErrors && ex . writeErrors . length ) {
233262 for ( let i = 0 ; i < ex . writeErrors . length ; i ++ ) {
234-
263+ batcherStats . update_processing -- ;
235264 if ( no_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
265+ batcherStats . update_errored_no_fallback ++ ;
266+ batcherStats . update_errored_no_fallback_last_error = ex . writeErrors [ i ] . errmsg ;
236267 //dispatch failure
237268 if ( notify_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
238269 var index0 = ex . writeErrors [ i ] . index ;
@@ -242,6 +273,7 @@ class WriteBatcher {
242273 //we could record in diagnostic data
243274 continue ;
244275 }
276+ batcherStats . update_errored_fallback ++ ;
245277 let index = ex . writeErrors [ i ] . index ;
246278 if ( queries [ index ] ) {
247279 //if we don't have anything for this document yet just use query
@@ -316,6 +348,7 @@ class WriteBatcher {
316348 }
317349 if ( ! this . data [ db ] [ collection ] [ id ] ) {
318350 this . data [ db ] [ collection ] [ id ] = { id : id , value : operation } ;
351+ batcherStats . update_queued ++ ;
319352 }
320353 else {
321354 this . data [ db ] [ collection ] [ id ] . value = common . mergeQuery ( this . data [ db ] [ collection ] [ id ] . value , operation ) ;
@@ -653,4 +686,4 @@ function getId() {
653686 return crypto . randomBytes ( 16 ) . toString ( "hex" ) ;
654687}
655688
656- module . exports = { WriteBatcher, ReadBatcher, InsertBatcher} ;
689+ module . exports = { WriteBatcher, ReadBatcher, InsertBatcher} ;
0 commit comments