Skip to content

Commit 500bbbb

Browse files
refactoring after peer review
1 parent 0232f80 commit 500bbbb

File tree

15 files changed

+503
-274
lines changed

15 files changed

+503
-274
lines changed

.mockery.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ packages:
115115
interfaces:
116116
EclipseCheckUseCase:
117117
UpdateBtcReleaseUseCase:
118-
NodePeerAlertUseCase:
118+
NodePeerCheckUseCase:
119119
github.com/rsksmart/liquidity-provider-server/internal/adapters/entrypoints/watcher/monitoring:
120120
interfaces:
121121
GetAssetReportUseCase:

cmd/application/lps/application.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ func (app *Application) prepareWatchers() ([]watcher.Watcher, error) {
175175
app.watcherRegistry.BitcoinPeerWatcher,
176176
app.watcherRegistry.RootstockPeerWatcher,
177177
app.watcherRegistry.QuoteMetricsWatcher,
178+
app.watcherRegistry.PeerMetricsWatcher,
178179
app.watcherRegistry.AssetReportWatcher,
179180
}
180181

internal/adapters/entrypoints/watcher/bitcoin_peer_watcher.go

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,48 +4,33 @@ import (
44
"context"
55
"time"
66

7-
"github.com/rsksmart/liquidity-provider-server/internal/adapters/entrypoints/watcher/monitoring"
87
"github.com/rsksmart/liquidity-provider-server/internal/entities"
9-
"github.com/rsksmart/liquidity-provider-server/internal/entities/blockchain"
108
"github.com/rsksmart/liquidity-provider-server/internal/entities/utils"
119
log "github.com/sirupsen/logrus"
1210
)
1311

14-
type NodePeerAlertUseCase interface {
15-
Run(ctx context.Context, nodeType entities.NodeType, currentPeers int64, minPeers uint64) error
12+
type NodePeerCheckUseCase interface {
13+
Run(ctx context.Context, nodeType entities.NodeType) error
1614
}
1715

1816
type BitcoinPeerWatcher struct {
19-
rpc blockchain.Rpc
20-
peerAlertUseCase NodePeerAlertUseCase
17+
peerCheckUseCase NodePeerCheckUseCase
2118
ticker utils.Ticker
2219
watcherStopChannel chan struct{}
23-
minPeers uint64
2420
validationTimeout time.Duration
25-
metrics *monitoring.Metrics
26-
alertCooldown time.Duration
27-
lastAlertTime time.Time
2821
}
2922

3023
func NewBitcoinPeerWatcher(
31-
rpc blockchain.Rpc,
32-
peerAlertUseCase NodePeerAlertUseCase,
24+
peerCheckUseCase NodePeerCheckUseCase,
3325
ticker utils.Ticker,
34-
minPeers uint64,
3526
validationTimeout time.Duration,
36-
alertCooldown time.Duration,
37-
metrics *monitoring.Metrics,
3827
) *BitcoinPeerWatcher {
3928
watcherStopChannel := make(chan struct{}, 1)
4029
return &BitcoinPeerWatcher{
41-
rpc: rpc,
42-
peerAlertUseCase: peerAlertUseCase,
30+
peerCheckUseCase: peerCheckUseCase,
4331
ticker: ticker,
4432
watcherStopChannel: watcherStopChannel,
45-
minPeers: minPeers,
4633
validationTimeout: validationTimeout,
47-
alertCooldown: alertCooldown,
48-
metrics: metrics,
4934
}
5035
}
5136

@@ -56,7 +41,12 @@ watcherLoop:
5641
for {
5742
select {
5843
case <-watcher.ticker.C():
59-
watcher.checkPeers()
44+
ctx, cancel := context.WithTimeout(context.Background(), watcher.validationTimeout)
45+
err := watcher.peerCheckUseCase.Run(ctx, entities.NodeTypeBitcoin)
46+
cancel()
47+
if err != nil {
48+
log.Error("BitcoinPeerWatcher: error running peer check: ", err)
49+
}
6050
case <-watcher.watcherStopChannel:
6151
watcher.ticker.Stop()
6252
close(watcher.watcherStopChannel)
@@ -65,32 +55,6 @@ watcherLoop:
6555
}
6656
}
6757

68-
func (watcher *BitcoinPeerWatcher) checkPeers() {
69-
ctx, cancel := context.WithTimeout(context.Background(), watcher.validationTimeout)
70-
defer cancel()
71-
currentPeers, err := watcher.rpc.Btc.GetConnectionCount()
72-
if err != nil {
73-
log.Error("BitcoinPeerWatcher: error getting connection count: ", err)
74-
watcher.metrics.IncrementNodePeerCheckError(entities.NodeTypeBitcoin)
75-
return
76-
}
77-
belowThreshold := uint64(currentPeers) < watcher.minPeers
78-
watcher.metrics.UpdateNodePeerStatus(entities.NodeTypeBitcoin, float64(currentPeers), float64(watcher.minPeers), belowThreshold)
79-
if !belowThreshold {
80-
return
81-
}
82-
log.Warnf("BitcoinPeerWatcher: peer count %d is below minimum %d", currentPeers, watcher.minPeers)
83-
if time.Since(watcher.lastAlertTime) < watcher.alertCooldown {
84-
return
85-
}
86-
if alertErr := watcher.peerAlertUseCase.Run(ctx, entities.NodeTypeBitcoin, currentPeers, watcher.minPeers); alertErr != nil {
87-
log.Error("BitcoinPeerWatcher: error sending low peer alert: ", alertErr)
88-
} else {
89-
watcher.lastAlertTime = time.Now()
90-
watcher.metrics.IncrementNodePeerAlert(entities.NodeTypeBitcoin)
91-
}
92-
}
93-
9458
func (watcher *BitcoinPeerWatcher) Shutdown(closeChannel chan<- bool) {
9559
watcher.watcherStopChannel <- struct{}{}
9660
closeChannel <- true

internal/adapters/entrypoints/watcher/bitcoin_peer_watcher_test.go

Lines changed: 24 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@ import (
44
"testing"
55
"time"
66

7-
"github.com/prometheus/client_golang/prometheus"
87
w "github.com/rsksmart/liquidity-provider-server/internal/adapters/entrypoints/watcher"
9-
"github.com/rsksmart/liquidity-provider-server/internal/adapters/entrypoints/watcher/monitoring"
108
"github.com/rsksmart/liquidity-provider-server/internal/entities"
11-
"github.com/rsksmart/liquidity-provider-server/internal/entities/blockchain"
129
"github.com/rsksmart/liquidity-provider-server/internal/entities/utils"
1310
"github.com/rsksmart/liquidity-provider-server/test/mocks"
1411
"github.com/stretchr/testify/assert"
@@ -17,47 +14,40 @@ import (
1714

1815
func TestBitcoinPeerWatcher_Shutdown(t *testing.T) {
1916
createWatcherShutdownTest(t, func(ticker utils.Ticker) w.Watcher {
20-
btcMock := &mocks.BtcRpcMock{}
21-
useCase := &mocks.NodePeerAlertUseCaseMock{}
22-
rpc := blockchain.Rpc{Btc: btcMock}
23-
appMetrics := monitoring.NewMetrics(prometheus.NewRegistry())
24-
return w.NewBitcoinPeerWatcher(rpc, useCase, ticker, 3, 15*time.Second, 30*time.Minute, appMetrics)
17+
useCase := &mocks.NodePeerCheckUseCaseMock{}
18+
return w.NewBitcoinPeerWatcher(useCase, ticker, 15*time.Second)
2519
})
2620
}
2721

28-
// nolint:funlen
2922
func TestBitcoinPeerWatcher_Start(t *testing.T) {
30-
t.Run("should alert when peer count is below threshold", func(t *testing.T) {
31-
runBitcoinPeerSubtest(t, int64(1), nil, 3, true, nil)
32-
})
33-
t.Run("should not alert when peer count is at or above threshold", func(t *testing.T) {
34-
runBitcoinPeerSubtest(t, int64(5), nil, 3, false, nil)
35-
})
36-
t.Run("should not alert when peer count equals threshold", func(t *testing.T) {
37-
runBitcoinPeerSubtest(t, int64(3), nil, 3, false, nil)
38-
})
39-
t.Run("should not alert when minPeers is zero", func(t *testing.T) {
40-
runBitcoinPeerSubtest(t, int64(0), nil, 0, false, nil)
41-
})
42-
t.Run("should continue running on RPC error", func(t *testing.T) {
43-
runBitcoinPeerSubtest(t, int64(0), assert.AnError, 3, false, nil)
44-
})
45-
t.Run("should continue running on alert send error", func(t *testing.T) {
46-
runBitcoinPeerSubtest(t, int64(1), nil, 3, true, assert.AnError)
23+
t.Run("should call use case on tick with bitcoin node type", func(t *testing.T) {
24+
useCase := &mocks.NodePeerCheckUseCaseMock{}
25+
ticker := &mocks.TickerMock{}
26+
tickerChannel := make(chan time.Time)
27+
closeChannel := make(chan bool)
28+
ticker.EXPECT().C().Return(tickerChannel)
29+
ticker.EXPECT().Stop().Return().Once()
30+
useCase.EXPECT().Run(mock.Anything, entities.NodeTypeBitcoin).Return(nil).Once()
31+
watcher := w.NewBitcoinPeerWatcher(useCase, ticker, 15*time.Second)
32+
go watcher.Start()
33+
tickerChannel <- time.Now()
34+
go watcher.Shutdown(closeChannel)
35+
<-closeChannel
36+
assert.Eventually(t, func() bool {
37+
return ticker.AssertExpectations(t) && useCase.AssertExpectations(t)
38+
}, time.Second, 100*time.Millisecond)
4739
})
48-
t.Run("should suppress alert during cooldown period", func(t *testing.T) {
49-
btcMock := &mocks.BtcRpcMock{}
50-
useCase := &mocks.NodePeerAlertUseCaseMock{}
40+
41+
t.Run("should continue running on use case error", func(t *testing.T) {
42+
useCase := &mocks.NodePeerCheckUseCaseMock{}
5143
ticker := &mocks.TickerMock{}
5244
tickerChannel := make(chan time.Time)
5345
closeChannel := make(chan bool)
5446
ticker.EXPECT().C().Return(tickerChannel)
5547
ticker.EXPECT().Stop().Return().Once()
56-
btcMock.On("GetConnectionCount").Return(int64(1), nil)
57-
useCase.EXPECT().Run(mock.Anything, entities.NodeTypeBitcoin, int64(1), uint64(3)).Return(nil).Once()
58-
rpc := blockchain.Rpc{Btc: btcMock}
59-
appMetrics := monitoring.NewMetrics(prometheus.NewRegistry())
60-
watcher := w.NewBitcoinPeerWatcher(rpc, useCase, ticker, 3, 15*time.Second, time.Hour, appMetrics)
48+
useCase.EXPECT().Run(mock.Anything, entities.NodeTypeBitcoin).Return(assert.AnError).Once()
49+
useCase.EXPECT().Run(mock.Anything, entities.NodeTypeBitcoin).Return(nil).Once()
50+
watcher := w.NewBitcoinPeerWatcher(useCase, ticker, 15*time.Second)
6151
go watcher.Start()
6252
tickerChannel <- time.Now()
6353
tickerChannel <- time.Now()
@@ -68,31 +58,3 @@ func TestBitcoinPeerWatcher_Start(t *testing.T) {
6858
}, time.Second, 100*time.Millisecond)
6959
})
7060
}
71-
72-
func runBitcoinPeerSubtest(t *testing.T, peerCount int64, rpcErr error, minPeers uint64, expectAlert bool, alertErr error) {
73-
t.Helper()
74-
btcMock := &mocks.BtcRpcMock{}
75-
useCase := &mocks.NodePeerAlertUseCaseMock{}
76-
ticker := &mocks.TickerMock{}
77-
tickerChannel := make(chan time.Time)
78-
closeChannel := make(chan bool)
79-
ticker.EXPECT().C().Return(tickerChannel)
80-
ticker.EXPECT().Stop().Return().Once()
81-
btcMock.On("GetConnectionCount").Return(peerCount, rpcErr).Once()
82-
if expectAlert {
83-
useCase.EXPECT().Run(mock.Anything, entities.NodeTypeBitcoin, peerCount, minPeers).Return(alertErr).Once()
84-
}
85-
rpc := blockchain.Rpc{Btc: btcMock}
86-
appMetrics := monitoring.NewMetrics(prometheus.NewRegistry())
87-
watcher := w.NewBitcoinPeerWatcher(rpc, useCase, ticker, minPeers, 15*time.Second, 30*time.Minute, appMetrics)
88-
go watcher.Start()
89-
tickerChannel <- time.Now()
90-
go watcher.Shutdown(closeChannel)
91-
<-closeChannel
92-
assert.Eventually(t, func() bool {
93-
return ticker.AssertExpectations(t) && btcMock.AssertExpectations(t)
94-
}, time.Second, 100*time.Millisecond)
95-
if !expectAlert {
96-
useCase.AssertNotCalled(t, "Run", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
97-
}
98-
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package monitoring
2+
3+
import (
4+
"context"
5+
6+
"github.com/rsksmart/liquidity-provider-server/internal/entities"
7+
"github.com/rsksmart/liquidity-provider-server/internal/entities/blockchain"
8+
log "github.com/sirupsen/logrus"
9+
)
10+
11+
type PeerMetricsWatcher struct {
12+
appMetrics *Metrics
13+
eventBus entities.EventBus
14+
closeChannel chan struct{}
15+
}
16+
17+
func NewPeerMetricsWatcher(appMetrics *Metrics, eventBus entities.EventBus) *PeerMetricsWatcher {
18+
closeChannel := make(chan struct{}, 1)
19+
return &PeerMetricsWatcher{
20+
appMetrics: appMetrics,
21+
eventBus: eventBus,
22+
closeChannel: closeChannel,
23+
}
24+
}
25+
26+
func (watcher *PeerMetricsWatcher) Prepare(ctx context.Context) error { return nil }
27+
28+
func (watcher *PeerMetricsWatcher) Start() {
29+
peerCheckChannel := watcher.eventBus.Subscribe(blockchain.NodePeerCheckEventId)
30+
peerCheckErrorChannel := watcher.eventBus.Subscribe(blockchain.NodePeerCheckErrorEventId)
31+
peerAlertSentChannel := watcher.eventBus.Subscribe(blockchain.NodePeerAlertSentEventId)
32+
33+
metricLoop:
34+
for {
35+
select {
36+
case event := <-peerCheckChannel:
37+
if peerEvent, ok := event.(blockchain.NodePeerCheckEvent); ok {
38+
watcher.appMetrics.UpdateNodePeerStatus(
39+
string(peerEvent.NodeType),
40+
float64(peerEvent.CurrentPeers),
41+
float64(peerEvent.MinPeers),
42+
peerEvent.BelowThreshold,
43+
)
44+
}
45+
case event := <-peerCheckErrorChannel:
46+
if peerErrorEvent, ok := event.(blockchain.NodePeerCheckErrorEvent); ok {
47+
watcher.appMetrics.IncrementNodePeerCheckError(string(peerErrorEvent.NodeType))
48+
}
49+
case event := <-peerAlertSentChannel:
50+
if peerAlertEvent, ok := event.(blockchain.NodePeerAlertSentEvent); ok {
51+
watcher.appMetrics.IncrementNodePeerAlert(string(peerAlertEvent.NodeType))
52+
}
53+
case <-watcher.closeChannel:
54+
close(watcher.closeChannel)
55+
break metricLoop
56+
}
57+
}
58+
}
59+
60+
func (watcher *PeerMetricsWatcher) Shutdown(closeChannel chan<- bool) {
61+
watcher.closeChannel <- struct{}{}
62+
closeChannel <- true
63+
log.Debug("Peer metrics watcher shutdown completed")
64+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package monitoring_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/rsksmart/liquidity-provider-server/internal/adapters/entrypoints/watcher/monitoring"
10+
"github.com/rsksmart/liquidity-provider-server/internal/entities"
11+
"github.com/rsksmart/liquidity-provider-server/internal/entities/blockchain"
12+
"github.com/rsksmart/liquidity-provider-server/test/mocks"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestPeerMetricsWatcher_Prepare(t *testing.T) {
18+
watcher := monitoring.NewPeerMetricsWatcher(monitoring.NewMetrics(prometheus.NewRegistry()), &mocks.EventBusMock{})
19+
require.NoError(t, watcher.Prepare(context.Background()))
20+
}
21+
22+
func TestPeerMetricsWatcher_Start(t *testing.T) {
23+
appMetrics := monitoring.NewMetrics(prometheus.NewRegistry())
24+
eventBus := &mocks.EventBusMock{}
25+
26+
checkChannel := make(chan entities.Event, 1)
27+
errorChannel := make(chan entities.Event, 1)
28+
alertChannel := make(chan entities.Event, 1)
29+
eventBus.On("Subscribe", blockchain.NodePeerCheckEventId).Return((<-chan entities.Event)(checkChannel))
30+
eventBus.On("Subscribe", blockchain.NodePeerCheckErrorEventId).Return((<-chan entities.Event)(errorChannel))
31+
eventBus.On("Subscribe", blockchain.NodePeerAlertSentEventId).Return((<-chan entities.Event)(alertChannel))
32+
33+
watcher := monitoring.NewPeerMetricsWatcher(appMetrics, eventBus)
34+
go watcher.Start()
35+
36+
checkChannel <- blockchain.NodePeerCheckEvent{
37+
BaseEvent: entities.NewBaseEvent(blockchain.NodePeerCheckEventId),
38+
NodeType: entities.NodeTypeBitcoin,
39+
CurrentPeers: 2,
40+
MinPeers: 3,
41+
BelowThreshold: true,
42+
}
43+
errorChannel <- blockchain.NodePeerCheckErrorEvent{
44+
BaseEvent: entities.NewBaseEvent(blockchain.NodePeerCheckErrorEventId),
45+
NodeType: entities.NodeTypeBitcoin,
46+
}
47+
alertChannel <- blockchain.NodePeerAlertSentEvent{
48+
BaseEvent: entities.NewBaseEvent(blockchain.NodePeerAlertSentEventId),
49+
NodeType: entities.NodeTypeBitcoin,
50+
}
51+
52+
assert.Eventually(t, func() bool {
53+
return getGaugeVecValue(appMetrics.NodePeerCountMetric, "bitcoin") == 2 &&
54+
getGaugeVecValue(appMetrics.NodePeerMinThresholdMetric, "bitcoin") == 3 &&
55+
getGaugeVecValue(appMetrics.NodePeerBelowThreshold, "bitcoin") == 1 &&
56+
getCounterVecValue(appMetrics.NodePeerCheckErrors, "bitcoin") == 1 &&
57+
getCounterVecValue(appMetrics.NodePeerAlerts, "bitcoin") == 1
58+
}, time.Second, 20*time.Millisecond)
59+
60+
shutdown := make(chan bool, 1)
61+
watcher.Shutdown(shutdown)
62+
<-shutdown
63+
eventBus.AssertExpectations(t)
64+
}
65+
66+
func TestPeerMetricsWatcher_Shutdown(t *testing.T) {
67+
appMetrics := monitoring.NewMetrics(prometheus.NewRegistry())
68+
eventBus := &mocks.EventBusMock{}
69+
eventBus.On("Subscribe", blockchain.NodePeerCheckEventId).Return(make(<-chan entities.Event))
70+
eventBus.On("Subscribe", blockchain.NodePeerCheckErrorEventId).Return(make(<-chan entities.Event))
71+
eventBus.On("Subscribe", blockchain.NodePeerAlertSentEventId).Return(make(<-chan entities.Event))
72+
73+
watcher := monitoring.NewPeerMetricsWatcher(appMetrics, eventBus)
74+
go watcher.Start()
75+
time.Sleep(10 * time.Millisecond)
76+
77+
closeChannel := make(chan bool, 1)
78+
watcher.Shutdown(closeChannel)
79+
select {
80+
case <-closeChannel:
81+
case <-time.After(time.Second):
82+
t.Fatal("Shutdown timed out")
83+
}
84+
eventBus.AssertExpectations(t)
85+
}

0 commit comments

Comments
 (0)