@@ -27,44 +27,55 @@ const ports = [443, 80, 6005, 6006, 51233, 51234]
2727const protocols = [ 'wss://' , 'ws://' ]
2828const connections : Map < string , WebSocket > = new Map ( )
2929const networkFee : Map < string , FeeVote > = new Map ( )
30+ const validationNetworkDb : Map < string , string > = new Map ( )
3031const CM_INTERVAL = 60 * 60 * 1000
3132const WS_TIMEOUT = 10000
3233const REPORTING_INTERVAL = 15 * 60 * 1000
3334const BACKTRACK_INTERVAL = 30 * 60 * 1000
35+ const BASE_RETRY_DELAY = 1 * 1000
36+ const MAX_RETRY_DELAY = 30 * 1000
3437
3538// The frequent closing codes seen so far after connections established include:
3639// 1008: Policy error: client is too slow. (Most frequent)
3740// 1006: Abnormal Closure: The connection was closed abruptly without a proper handshake or a clean closure.
3841// 1005: No Status Received: An empty or undefined status code is used to indicate no further details about the closure.
3942// Reconnection should happen after seeing these codes for established connections.
4043const CLOSING_CODES = [ 1005 , 1006 , 1008 ]
41- let connectionsInitialized = false
4244let cmStarted = false
4345
4446/**
4547 * Sets the handlers for each WebSocket object.
4648 *
47- * @param ip - The ip address of the node we are trying to reach.
49+ * @param ws_url - The WebSocket address of the node we are trying to reach.
4850 * @param ws - A WebSocket object.
4951 * @param networks - The networks of the node we are trying to reach where it retrieves validations.
5052 * @param isInitialNode - Whether source node is an entry/initial node for the network.
53+ * @param retryCount - Retry count for exponential backoff.
5154 * @returns A Promise that resolves to void once a connection has been created or timeout has occured.
5255 */
56+ // eslint-disable-next-line max-params -- Required here
5357async function setHandlers (
54- ip : string ,
58+ ws_url : string ,
5559 ws : WebSocket ,
5660 networks : string | undefined ,
5761 isInitialNode = false ,
62+ retryCount = 0 ,
5863) : Promise < void > {
5964 const ledger_hashes : string [ ] = [ ]
6065 return new Promise ( function setHandlersPromise ( resolve , _reject ) {
6166 ws . on ( 'open' , ( ) => {
62- if ( connections . has ( ip ) ) {
67+ log . info (
68+ `Websocket connection opened for: ${ ws . url } on ${
69+ networks ?? 'unknown network'
70+ } `,
71+ )
72+
73+ if ( connections . has ( ws . url ) ) {
6374 resolve ( )
6475 return
6576 }
6677 void saveNodeWsUrl ( ws . url , true )
67- connections . set ( ip , ws )
78+ connections . set ( ws . url , ws )
6879 subscribe ( ws )
6980
7081 // Use LedgerEntry to look for amendments that has already been enabled on a network when connections
@@ -103,45 +114,63 @@ async function setHandlers(
103114 networks ,
104115 networkFee ,
105116 ws ,
117+ validationNetworkDb ,
106118 )
107119 }
108120 } )
109121 ws . on ( 'close' , async ( code , reason ) => {
110- const nodeNetworks = networks ?? 'unknown network'
111- if ( connectionsInitialized ) {
112- log . error (
113- `Websocket closed for ${
114- ws . url
115- } on ${ nodeNetworks } with code ${ code } and reason ${ reason . toString (
116- 'utf-8' ,
117- ) } .`,
122+ log . error (
123+ `Websocket closed for ${ ws . url } on ${
124+ networks ?? 'unknown network'
125+ } with code ${ code } and reason ${ reason . toString ( 'utf-8' ) } .`,
126+ )
127+
128+ const delay = BASE_RETRY_DELAY * 2 ** retryCount
129+
130+ if ( CLOSING_CODES . includes ( code ) && delay <= MAX_RETRY_DELAY ) {
131+ log . info (
132+ `Reconnecting to ${ ws . url } on ${
133+ networks ?? 'unknown network'
134+ } after ${ delay } ms...`,
118135 )
119- if ( CLOSING_CODES . includes ( code ) ) {
120- log . info (
121- `Reconnecting to ${ ws . url } on ${ networks ?? 'unknown network' } ...` ,
122- )
136+ // Clean up the old Websocket connection
137+ connections . delete ( ws . url )
138+ ws . terminate ( )
139+ resolve ( )
140+
141+ setTimeout ( async ( ) => {
123142 // Open a new Websocket connection for the same url
124143 const newWS = new WebSocket ( ws . url , { handshakeTimeout : WS_TIMEOUT } )
125- // Clean up the old Websocket connection
126- connections . delete ( ip )
127- ws . terminate ( )
128- resolve ( )
129144
130- await setHandlers ( ip , newWS , networks , isInitialNode )
131- // return since the old websocket connection has already been terminated
132- return
133- }
145+ await setHandlers (
146+ ws_url ,
147+ newWS ,
148+ networks ,
149+ isInitialNode ,
150+ retryCount + 1 ,
151+ )
152+ } , delay )
153+
154+ // return since the old websocket connection has already been terminated
155+ return
134156 }
135- if ( connections . get ( ip ) ?. url === ws . url ) {
136- connections . delete ( ip )
157+
158+ if ( connections . get ( ws . url ) ?. url === ws . url ) {
159+ connections . delete ( ws . url )
137160 void saveNodeWsUrl ( ws . url , false )
138161 }
139162 ws . terminate ( )
140163 resolve ( )
141164 } )
142- ws . on ( 'error' , ( ) => {
143- if ( connections . get ( ip ) ?. url === ws . url ) {
144- connections . delete ( ip )
165+ ws . on ( 'error' , ( err ) => {
166+ log . error (
167+ `Websocket connection error for ${ ws . url } on ${
168+ networks ?? 'unknown network'
169+ } - ${ err . message } `,
170+ )
171+
172+ if ( connections . get ( ws . url ) ?. url === ws . url ) {
173+ connections . delete ( ws . url )
145174 }
146175 ws . terminate ( )
147176 resolve ( )
@@ -169,13 +198,13 @@ async function findConnection(node: WsNode): Promise<void> {
169198 return Promise . resolve ( )
170199 }
171200
172- if ( connections . has ( node . ip ) ) {
201+ if ( Array . from ( connections . keys ( ) ) . some ( ( key ) => key . includes ( node . ip ) ) ) {
173202 return Promise . resolve ( )
174203 }
175204
176205 if ( node . ws_url ) {
177206 const ws = new WebSocket ( node . ws_url , { handshakeTimeout : WS_TIMEOUT } )
178- return setHandlers ( node . ip , ws , node . networks )
207+ return setHandlers ( node . ws_url , ws , node . networks )
179208 }
180209
181210 const promises : Array < Promise < void > > = [ ]
@@ -185,7 +214,7 @@ async function findConnection(node: WsNode): Promise<void> {
185214 const ws = new WebSocket ( url , { handshakeTimeout : WS_TIMEOUT } )
186215 promises . push (
187216 setHandlers (
188- node . ip ,
217+ url ,
189218 ws ,
190219 node . networks ,
191220 networkInitialIps . includes ( node . ip ) ,
@@ -197,13 +226,23 @@ async function findConnection(node: WsNode): Promise<void> {
197226 return Promise . resolve ( )
198227}
199228
229+ async function getValidationNetworkDb ( ) : Promise < void > {
230+ const validatorNetwork : Array < { signing_key : string ; networks : string } > =
231+ await query ( 'validators' ) . select ( 'signing_key' , 'networks' )
232+ for ( const entry of validatorNetwork ) {
233+ validationNetworkDb . set ( entry . signing_key , entry . networks )
234+ }
235+ }
236+
200237/**
201238 * Creates connections to nodes found in the database.
202239 *
203240 * @returns A promise that resolves to void once all possible connections have been created.
204241 */
205242async function createConnections ( ) : Promise < void > {
206243 log . info ( 'Finding Connections...' )
244+ validationNetworkDb . clear ( )
245+ await getValidationNetworkDb ( )
207246 const tenMinutesAgo = new Date ( )
208247 tenMinutesAgo . setMinutes ( tenMinutesAgo . getMinutes ( ) - 10 )
209248
@@ -222,12 +261,12 @@ async function createConnections(): Promise<void> {
222261 } )
223262
224263 const promises : Array < Promise < void > > = [ ]
225- connectionsInitialized = false
264+
226265 nodes . forEach ( ( node : WsNode ) => {
227266 promises . push ( findConnection ( node ) )
228267 } )
229268 await Promise . all ( promises )
230- connectionsInitialized = true
269+
231270 log . info ( `${ connections . size } connections created` )
232271}
233272
0 commit comments