Skip to content
This repository was archived by the owner on Mar 14, 2025. It is now read-only.

Commit aa0f5ce

Browse files
authored
BCFR-1076 false unreachable RPC transition on rotation (#1548)
## Motivation In cases when lease duration > 0 MultiNode periodically rotates RPCs. As we have not marked the finalized head subscription as part of aliveLoopSub on RPC rotation, the subscription was closed, which caused RPC to transition to an unreachable state. ## Solution Mark finalized subscription as part of alive loop to avoid subscription termination on RPC rotation.
1 parent ff30f81 commit aa0f5ce

File tree

8 files changed

+139
-4
lines changed

8 files changed

+139
-4
lines changed

common/client/mock_node_client_test.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client/mock_rpc_test.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client/node_lifecycle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
119119
}
120120

121121
defer finalizedHeadsSub.Unsubscribe()
122+
n.rpc.SetAliveLoopFinalizedHeadSub(finalizedHeadsSub.sub)
122123
}
123124

124125
var pollCh <-chan time.Time

common/client/node_lifecycle_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
446446
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
447447
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
448448
rpc.On("SetAliveLoopSub", mock.Anything).Once()
449+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
449450
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
450451
node := newDialedNode(t, testNodeOpts{
451452
config: testNodeConfig{},
@@ -467,6 +468,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
467468
ch := make(chan Head)
468469
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
469470
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
471+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
470472
name := "node-" + rand.Str(5)
471473
node := newSubscribedNode(t, testNodeOpts{
472474
config: testNodeConfig{},
@@ -501,6 +503,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
501503
ch := make(chan Head)
502504
close(ch)
503505
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
506+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
504507
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
505508
node := newSubscribedNode(t, testNodeOpts{
506509
chainConfig: clientMocks.ChainConfig{
@@ -527,6 +530,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
527530
ch := make(chan Head, 1)
528531
ch <- head{BlockNumber: 10}.ToMockHead(t)
529532
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
533+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
530534
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
531535
noNewFinalizedHeadsThreshold := tests.TestInterval
532536
node := newSubscribedNode(t, testNodeOpts{
@@ -560,6 +564,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
560564
rpc := newMockNodeClient[types.ID, Head](t)
561565
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
562566
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
567+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
563568
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
564569
noNewFinalizedHeadsThreshold := tests.TestInterval
565570
node := newSubscribedNode(t, testNodeOpts{
@@ -593,6 +598,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
593598
sub.On("Err").Return((<-chan error)(errCh))
594599
sub.On("Unsubscribe").Once()
595600
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(nil), sub, nil).Once()
601+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
596602
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
597603
node := newSubscribedNode(t, testNodeOpts{
598604
chainConfig: clientMocks.ChainConfig{
@@ -1116,6 +1122,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
11161122
outOfSyncSubscription.On("Unsubscribe").Once()
11171123
ch := make(chan Head)
11181124
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), outOfSyncSubscription, nil).Once()
1125+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
11191126

11201127
setupRPCForAliveLoop(t, rpc)
11211128

common/client/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type NodeClient[
6666
ClientVersion(context.Context) (string, error)
6767
SubscribersCount() int32
6868
SetAliveLoopSub(types.Subscription)
69+
SetAliveLoopFinalizedHeadSub(types.Subscription)
6970
UnsubscribeAllExceptAliveLoop()
7071
IsSyncing(ctx context.Context) (bool, error)
7172
SubscribeToFinalizedHeads(_ context.Context) (<-chan HEAD, types.Subscription, error)

core/chains/evm/client/mocks/rpc_client.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/chains/evm/client/rpc_client.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ type rpcClient struct {
137137
subs []ethereum.Subscription
138138

139139
// Need to track the aliveLoop subscription, so we do not cancel it when checking lease on the MultiNode
140-
aliveLoopSub ethereum.Subscription
140+
aliveLoopHeadsSub ethereum.Subscription
141+
aliveLoopFinalizedHeadsSub ethereum.Subscription
141142

142143
// chStopInFlight can be closed to immediately cancel all in-flight requests on
143144
// this rpcClient. Closing and replacing should be serialized through
@@ -368,11 +369,18 @@ func (r *rpcClient) unsubscribeAll() {
368369
}
369370
r.subs = nil
370371
}
371-
func (r *rpcClient) SetAliveLoopSub(sub commontypes.Subscription) {
372+
func (r *rpcClient) SetAliveLoopSub(headsSub commontypes.Subscription) {
372373
r.stateMu.Lock()
373374
defer r.stateMu.Unlock()
374375

375-
r.aliveLoopSub = sub
376+
r.aliveLoopHeadsSub = headsSub
377+
}
378+
379+
func (r *rpcClient) SetAliveLoopFinalizedHeadSub(finalizedHeads commontypes.Subscription) {
380+
r.stateMu.Lock()
381+
defer r.stateMu.Unlock()
382+
383+
r.aliveLoopFinalizedHeadsSub = finalizedHeads
376384
}
377385

378386
// SubscribersCount returns the number of client subscribed to the node
@@ -389,7 +397,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() {
389397
defer r.stateMu.Unlock()
390398

391399
for _, s := range r.subs {
392-
if s != r.aliveLoopSub {
400+
if s != r.aliveLoopHeadsSub && s != r.aliveLoopFinalizedHeadsSub {
393401
s.Unsubscribe()
394402
}
395403
}

core/chains/evm/client/rpc_client_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,25 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
318318
require.NoError(t, err)
319319
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
320320
})
321+
t.Run("UnsubscribeAllExceptAliveLoop should keep finalized heads subscription open", func(t *testing.T) {
322+
server := testutils.NewWSServer(t, chainId, serverCallBack)
323+
wsURL := server.WSURL()
324+
325+
rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
326+
defer rpc.Close()
327+
require.NoError(t, rpc.Dial(ctx))
328+
329+
_, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t))
330+
require.NoError(t, err)
331+
rpc.SetAliveLoopFinalizedHeadSub(sub)
332+
rpc.UnsubscribeAllExceptAliveLoop()
333+
select {
334+
case <-sub.Err():
335+
t.Fatal("Expected subscription to remain open")
336+
default:
337+
}
338+
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
339+
})
321340
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
322341
server := testutils.NewWSServer(t, chainId, serverCallBack)
323342
wsURL := server.WSURL()

0 commit comments

Comments
 (0)