@@ -447,10 +447,17 @@ LIMIT_MAKER
447447 let context = _depthCacheContext [ symbol ] ;
448448 // This now conforms 100% to the Binance docs constraints on managing a local order book
449449 if ( ! context . lastEventUpdateId && ( depth . U > context . snapshotUpdateId + 1 || depth . u < context . snapshotUpdateId + 1 ) ) {
450- options . log ( 'depthHandler :' + symbol + ': Unexpected first depth event. Skipping. ' +
451- '!! IF this persists, the "' + symbol + '" cache is OUT OF SYNC !! ' ) ;
450+ // I think if the count exceeded 1 we could deem the cache out of sync. But we'll
451+ // be lenient and give the cache up to a count of 3 before calling it out of sync.
452+ if ( ++ context . skipCount > 2 ) {
453+ const msg = 'depthHandler: [' + symbol + '] Skip count exceeded. The depth cache is out of sync.' ;
454+ if ( options . verbose ) options . log ( msg ) ;
455+ throw new Error ( msg ) ;
456+ }
452457 } else if ( context . lastEventUpdateId && depth . U !== context . lastEventUpdateId + 1 ) {
453- options . log ( 'depthHandler :' + symbol + ': !! DEPTH CACHE OUT OF SYNC !!' ) ;
458+ const msg = 'depthHandler: [' + symbol + '] Incorrect update ID. The depth cache is out of sync.' ;
459+ if ( options . verbose ) options . log ( msg ) ;
460+ throw new Error ( msg ) ;
454461 } else {
455462 for ( obj of depth . b ) { //bids
456463 depthCache [ symbol ] . bids [ obj [ 0 ] ] = parseFloat ( obj [ 1 ] ) ;
@@ -464,6 +471,7 @@ LIMIT_MAKER
464471 delete depthCache [ symbol ] . asks [ obj [ 0 ] ] ;
465472 }
466473 }
474+ context . skipCount = 0 ;
467475 context . lastEventUpdateId = depth . u ;
468476 }
469477 } ;
@@ -873,6 +881,7 @@ LIMIT_MAKER
873881 let context = _depthCacheContext [ symbol ] ;
874882 context . snapshotUpdateId = undefined ;
875883 context . lastEventUpdateId = undefined ;
884+ context . skipCount = 0 ;
876885 context . messageQueue = [ ] ;
877886
878887 depthCache [ symbol ] = { bids : { } , asks : { } } ;
@@ -884,8 +893,12 @@ LIMIT_MAKER
884893 if ( context . messageQueue && ! context . snapshotUpdateId ) {
885894 context . messageQueue . push ( depth ) ;
886895 } else {
887- depthHandler ( depth ) ;
888- if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
896+ try {
897+ depthHandler ( depth ) ;
898+ if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
899+ } catch ( err ) {
900+ reconnect ( ) ;
901+ }
889902 }
890903 } ;
891904
@@ -898,8 +911,12 @@ LIMIT_MAKER
898911 context . snapshotUpdateId = json . lastUpdateId ;
899912 context . messageQueue = context . messageQueue . filter ( depth => depth . u > context . snapshotUpdateId ) ;
900913 // Process any pending depth messages
901- for ( let depth of context . messageQueue )
902- depthHandler ( depth , json . lastUpdateId ) ;
914+ for ( let depth of context . messageQueue ) {
915+ // Although sync errors shouldn't ever happen here, we catch and swallow them anyway
916+ // just in case. The stream handler function above will deal with broken caches.
917+ try { depthHandler ( depth ) ; }
918+ catch ( err ) { }
919+ }
903920 delete context . messageQueue ;
904921 if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
905922 } ) ;
0 commit comments