Skip to content

Commit bfd4001

Browse files
authored
[KLC-2402] appStatusPolling: make Poll stoppable; wire shutdown (#57)
* [KLC-2402] appStatusPolling: make Poll stoppable; wire shutdown Replaces the unbounded `for { Sleep; ... }` goroutine in AppStatusPolling with a select over a `done` channel and a ticker, exposing an idempotent `Close() error` (io.Closer). Killed a pre-existing nil-pointer panic in TestStartStatusPolling/Valid_configuration, where the leaked goroutine outlived the test and fired against a stub with a nil method handler. API: - metrics.StartStatusPolling, StartNodeMetricsPolling, and StartMachineStatisticsPolling now return (io.Closer, error). Shutdown: - cmd/node/startup.go captures the three closers and passes them to closeAllComponents, which now Closes them first — before tearing down the components their handlers read from (NetMessenger, Store, etc.). - Close signals stop but does not block on the goroutine, per the ticket's requirement. Tests: - matics_test.go and appStatusPolling_test.go register t.Cleanup to Close pollers, eliminating the test goroutine leak. - Added TestAppStatusPolling_Close_IsIdempotent and TestAppStatusPolling_Close_StopsGoroutine. Out of scope (separate ticket): the leaky `for {}` goroutines inside registerCPUStatistics / registerNetStatistics / registerDiskStatistics. * fix(KLC-2402): address review feedback - StartStatusPolling: check.IfNil(ash) for typed-nil safety - StartMachineStatisticsPolling: add check.IfNil(notifier) guard - Wrap NewAppStatusPolling error with %w in both Start* wrappers - Test: fix duration multiplication typo (~63yr -> 2s) - Test: use require.Eventually for first-tick wait; settling-window check for post-Close stability
1 parent 9c6c33b commit bfd4001

6 files changed

Lines changed: 146 additions & 43 deletions

File tree

cmd/node/metrics/machineStatistics.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package metrics
22

33
import (
44
"errors"
5+
"fmt"
6+
"io"
57
"path/filepath"
68
"time"
79

@@ -13,40 +15,45 @@ import (
1315
"github.com/klever-io/klever-go/tools/check"
1416
)
1517

16-
// StartMachineStatisticsPolling will start read information about current running machine
17-
func StartMachineStatisticsPolling(ash core.AppStatusHandler, notifier eventNotifier.EpochStartEventNotifier, pollingInterval time.Duration, workingDir string) error {
18+
// StartMachineStatisticsPolling will start read information about current running machine.
19+
// The returned io.Closer stops the AppStatusPolling goroutine; the inner goroutines in
20+
// registerCPUStatistics / registerNetStatistics / registerDiskStatistics still leak
21+
func StartMachineStatisticsPolling(ash core.AppStatusHandler, notifier eventNotifier.EpochStartEventNotifier, pollingInterval time.Duration, workingDir string) (io.Closer, error) {
1822
if check.IfNil(ash) {
19-
return errors.New("nil AppStatusHandler")
23+
return nil, errors.New("nil AppStatusHandler")
24+
}
25+
if check.IfNil(notifier) {
26+
return nil, errors.New("nil EpochStartEventNotifier")
2027
}
2128

2229
appStatusPollingHandler, err := appStatusPolling.NewAppStatusPolling(ash, pollingInterval)
2330
if err != nil {
24-
return errors.New("cannot init AppStatusPolling")
31+
return nil, fmt.Errorf("cannot init AppStatusPolling: %w", err)
2532
}
2633

2734
err = registerCPUStatistics(appStatusPollingHandler)
2835
if err != nil {
29-
return err
36+
return nil, err
3037
}
3138

3239
err = registerMemStatistics(appStatusPollingHandler)
3340
if err != nil {
34-
return err
41+
return nil, err
3542
}
3643

3744
err = registerNetStatistics(appStatusPollingHandler, notifier)
3845
if err != nil {
39-
return err
46+
return nil, err
4047
}
4148

4249
err = registerDiskStatistics(appStatusPollingHandler, workingDir)
4350
if err != nil {
44-
return err
51+
return nil, err
4552
}
4653

4754
appStatusPollingHandler.Poll()
4855

49-
return nil
56+
return appStatusPollingHandler, nil
5057
}
5158

5259
func registerMemStatistics(appStatusPollingHandler *appStatusPolling.AppStatusPolling) error {

cmd/node/metrics/matics_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func TestStartStatusPolling(t *testing.T) {
262262
for _, tt := range tests {
263263
t.Run(tt.name, func(t *testing.T) {
264264
// Call the function to test
265-
err := StartStatusPolling(
265+
closer, err := StartStatusPolling(
266266
tt.appStatusHandler,
267267
time.Second,
268268
tt.networkComponents,
@@ -272,8 +272,11 @@ func TestStartStatusPolling(t *testing.T) {
272272
// Assert the result
273273
if tt.expectedError {
274274
assert.Error(t, err)
275+
assert.Nil(t, closer)
275276
} else {
276277
assert.Nil(t, err)
278+
require.NotNil(t, closer)
279+
t.Cleanup(func() { _ = closer.Close() })
277280
}
278281
})
279282
}
@@ -451,6 +454,7 @@ func TestRegisterPollConnectedPeers(t *testing.T) {
451454
assert.NotPanics(t, func() {
452455
pollingHandler.Poll()
453456
})
457+
t.Cleanup(func() { _ = pollingHandler.Close() })
454458
}
455459

456460
// TestRegisterPollProbableHighestNonce tests the function that registers a polling function for the probable highest nonce
@@ -485,6 +489,7 @@ func TestRegisterPollProbableHighestNonce(t *testing.T) {
485489
assert.NotPanics(t, func() {
486490
pollingHandler.Poll()
487491
})
492+
t.Cleanup(func() { _ = pollingHandler.Close() })
488493

489494
// Optionally, verify that the metric gets set when Poll() is called
490495
// This requires a metricsMap to track what values are set
@@ -498,6 +503,7 @@ func TestRegisterPollProbableHighestNonce(t *testing.T) {
498503
pollingWithMetricsHandler, _ := appStatusPolling.NewAppStatusPolling(metricsHandler, time.Second)
499504
_ = registerPollProbableHighestNonce(pollingWithMetricsHandler, processComponents)
500505
pollingWithMetricsHandler.Poll()
506+
t.Cleanup(func() { _ = pollingWithMetricsHandler.Close() })
501507

502508
// Verify that the metric was set correctly
503509
// Note: This might not be reliable depending on the implementation of AppStatusPolling
@@ -553,16 +559,18 @@ func TestStartNodeMetricsPolling_NilHandler(t *testing.T) {
553559
t.Parallel()
554560

555561
redundancyHandler := &consensusMock.NodeRedundancyHandlerStub{}
556-
err := StartNodeMetricsPolling(nil, time.Second, redundancyHandler)
562+
closer, err := StartNodeMetricsPolling(nil, time.Second, redundancyHandler)
557563
assert.Error(t, err)
564+
assert.Nil(t, closer)
558565
}
559566

560567
func TestStartNodeMetricsPolling_NilRedundancyHandler(t *testing.T) {
561568
t.Parallel()
562569

563570
handler := &mock.AppStatusHandlerStub{}
564-
err := StartNodeMetricsPolling(handler, time.Second, nil)
571+
closer, err := StartNodeMetricsPolling(handler, time.Second, nil)
565572
assert.Error(t, err)
573+
assert.Nil(t, closer)
566574
}
567575

568576
func TestStartNodeMetricsPolling_Valid(t *testing.T) {
@@ -583,8 +591,10 @@ func TestStartNodeMetricsPolling_Valid(t *testing.T) {
583591
GetSlotsOfInactivityCalled: func() uint64 { return 3 },
584592
}
585593

586-
err := StartNodeMetricsPolling(handler, time.Second, redundancyHandler)
594+
closer, err := StartNodeMetricsPolling(handler, time.Second, redundancyHandler)
587595
assert.NoError(t, err)
596+
require.NotNil(t, closer)
597+
t.Cleanup(func() { _ = closer.Close() })
588598

589599
// Start timestamp should be set immediately (non-zero)
590600
mu.Lock()

cmd/node/metrics/metrics.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package metrics
33
import (
44
"errors"
55
"fmt"
6+
"io"
67
"sort"
78
"strings"
89
"time"
@@ -122,41 +123,42 @@ func SaveStringMetric(ash core.AppStatusHandler, key, value string) {
122123
ash.SetStringValue(key, value)
123124
}
124125

125-
// StartStatusPolling will start save information in status handler about network
126+
// StartStatusPolling will start save information in status handler about network.
127+
// The returned io.Closer stops the polling goroutine.
126128
func StartStatusPolling(
127129
ash core.AppStatusHandler,
128130
pollingInterval time.Duration,
129131
networkComponents *factory.NetworkComponents,
130132
processComponents *factory.Process,
131-
) error {
132-
if ash == nil {
133-
return errors.New("nil AppStatusHandler")
133+
) (io.Closer, error) {
134+
if check.IfNil(ash) {
135+
return nil, errors.New("nil AppStatusHandler")
134136
}
135137
if networkComponents == nil {
136-
return errors.New("nil networkComponents")
138+
return nil, errors.New("nil networkComponents")
137139
}
138140
if processComponents == nil {
139-
return errors.New("nil processComponents")
141+
return nil, errors.New("nil processComponents")
140142
}
141143

142144
appStatusPollingHandler, err := appStatusPolling.NewAppStatusPolling(ash, pollingInterval)
143145
if err != nil {
144-
return errors.New("cannot init AppStatusPolling")
146+
return nil, fmt.Errorf("cannot init AppStatusPolling: %w", err)
145147
}
146148

147149
err = registerPollConnectedPeers(appStatusPollingHandler, networkComponents)
148150
if err != nil {
149-
return err
151+
return nil, err
150152
}
151153

152154
err = registerPollProbableHighestNonce(appStatusPollingHandler, processComponents)
153155
if err != nil {
154-
return err
156+
return nil, err
155157
}
156158

157159
appStatusPollingHandler.Poll()
158160

159-
return nil
161+
return appStatusPollingHandler, nil
160162
}
161163

162164
func registerPollConnectedPeers(
@@ -234,38 +236,39 @@ func setCurrentP2pNodeAddresses(
234236

235237
// StartNodeMetricsPolling starts polling for uptime and redundancy metrics on a single shared
236238
// polling goroutine, avoiding the overhead of separate AppStatusPolling instances for each.
239+
// The returned io.Closer stops the polling goroutine.
237240
func StartNodeMetricsPolling(
238241
ash core.AppStatusHandler,
239242
pollingInterval time.Duration,
240243
redundancyHandler consensus.NodeRedundancyHandler,
241-
) error {
244+
) (io.Closer, error) {
242245
if check.IfNil(ash) {
243-
return errors.New("nil AppStatusHandler")
246+
return nil, errors.New("nil AppStatusHandler")
244247
}
245248
if check.IfNil(redundancyHandler) {
246-
return errors.New("nil NodeRedundancyHandler")
249+
return nil, errors.New("nil NodeRedundancyHandler")
247250
}
248251

249252
startTime := time.Now()
250253
ash.SetUInt64Value(core.MetricNodeStartTimestamp, uint64(startTime.Unix())) // #nosec G115
251254

252255
appStatusPollingHandler, err := appStatusPolling.NewAppStatusPolling(ash, pollingInterval)
253256
if err != nil {
254-
return fmt.Errorf("cannot init AppStatusPolling for node metrics: %w", err)
257+
return nil, fmt.Errorf("cannot init AppStatusPolling for node metrics: %w", err)
255258
}
256259

257260
pollingFunc := buildNodeMetricsPollingFunc(startTime, redundancyHandler)
258261
err = appStatusPollingHandler.RegisterPollingFunc(pollingFunc)
259262
if err != nil {
260-
return fmt.Errorf("cannot register node metrics polling function: %w", err)
263+
return nil, fmt.Errorf("cannot register node metrics polling function: %w", err)
261264
}
262265

263266
// Prime before the recurring poll so /node/status returns the real level on
264267
// request 1 (AppStatusPolling.Poll sleeps before its first tick).
265268
pollingFunc(ash)
266269

267270
appStatusPollingHandler.Poll()
268-
return nil
271+
return appStatusPollingHandler, nil
269272
}
270273

271274
func buildNodeMetricsPollingFunc(

cmd/node/startup.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
967967

968968
log.Trace("starting status pooling components")
969969
statusPollingInterval := time.Duration(cfg.Preferences.StatusPollingIntervalSec) * time.Second
970-
err = metrics.StartStatusPolling(
970+
statusPollingCloser, err := metrics.StartStatusPolling(
971971
currentNode.GetAppStatusHandler(),
972972
statusPollingInterval,
973973
networkComponents,
@@ -978,16 +978,19 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
978978
}
979979

980980
updateMachineStatisticsDuration := time.Second
981-
err = metrics.StartMachineStatisticsPolling(coreComponents.StatusHandler, epochStartNotifier, updateMachineStatisticsDuration, workingDir)
981+
machineStatsCloser, err := metrics.StartMachineStatisticsPolling(coreComponents.StatusHandler, epochStartNotifier, updateMachineStatisticsDuration, workingDir)
982982
if err != nil {
983983
return err
984984
}
985985

986-
err = metrics.StartNodeMetricsPolling(coreComponents.StatusHandler, statusPollingInterval, nodeRedundancy)
986+
nodeMetricsCloser, err := metrics.StartNodeMetricsPolling(coreComponents.StatusHandler, statusPollingInterval, nodeRedundancy)
987987
if err != nil {
988988
return err
989989
}
990990

991+
// Pollers are independent; order doesn't matter.
992+
pollingClosers := []io.Closer{statusPollingCloser, machineStatsCloser, nodeMetricsCloser}
993+
991994
log.Trace("creating klever node facade")
992995
restAPIServerDebugMode := ctx.GlobalBool(restAPIDebug.Name)
993996

@@ -1041,7 +1044,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
10411044

10421045
chanCloseComponents := make(chan struct{})
10431046
go func() {
1044-
closeAllComponents(log, healthService, dataComponents, triesComponents, networkComponents, chanCloseComponents)
1047+
closeAllComponents(log, healthService, dataComponents, triesComponents, networkComponents, pollingClosers, chanCloseComponents)
10451048
}()
10461049

10471050
_ = tracing.Shutdown() // errors logged internally
@@ -1134,8 +1137,18 @@ func closeAllComponents(
11341137
dataComponents *factory.DataComponents,
11351138
triesComponents *factory.TriesComponents,
11361139
networkComponents *factory.NetworkComponents,
1140+
pollingClosers []io.Closer,
11371141
chanCloseComponents chan struct{},
11381142
) {
1143+
// Stop polling first so handlers cannot fire against components being torn down below.
1144+
log.Debug("closing status polling goroutines...")
1145+
for _, c := range pollingClosers {
1146+
if c == nil {
1147+
continue
1148+
}
1149+
log.LogIfError(c.Close())
1150+
}
1151+
11391152
log.Debug("closing health service...")
11401153
err := healthService.Close()
11411154
log.LogIfError(err)

core/appStatusPolling/appStatusPolling.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type AppStatusPolling struct {
1616
mutRegisteredFunc sync.RWMutex
1717
registeredFunctions []func(appStatusHandler core.AppStatusHandler)
1818
appStatusHandler core.AppStatusHandler
19+
done chan struct{}
20+
closeOnce sync.Once
1921
}
2022

2123
// NewAppStatusPolling will return an instance of AppStatusPolling
@@ -29,6 +31,7 @@ func NewAppStatusPolling(appStatusHandler core.AppStatusHandler, pollingDuration
2931
return &AppStatusPolling{
3032
pollingDuration: pollingDuration,
3133
appStatusHandler: appStatusHandler,
34+
done: make(chan struct{}),
3235
}, nil
3336
}
3437

@@ -43,17 +46,32 @@ func (asp *AppStatusPolling) RegisterPollingFunc(handler func(appStatusHandler c
4346
return nil
4447
}
4548

46-
// Poll will notify the AppStatusHandler at a given time
49+
// Poll will notify the AppStatusHandler at a given time. The goroutine runs
50+
// until Close is called.
4751
func (asp *AppStatusPolling) Poll() {
4852
go func() {
49-
for {
50-
time.Sleep(asp.pollingDuration)
53+
ticker := time.NewTicker(asp.pollingDuration)
54+
defer ticker.Stop()
5155

52-
asp.mutRegisteredFunc.RLock()
53-
for _, handler := range asp.registeredFunctions {
54-
handler(asp.appStatusHandler)
56+
for {
57+
select {
58+
case <-asp.done:
59+
return
60+
case <-ticker.C:
61+
asp.mutRegisteredFunc.RLock()
62+
for _, handler := range asp.registeredFunctions {
63+
handler(asp.appStatusHandler)
64+
}
65+
asp.mutRegisteredFunc.RUnlock()
5566
}
56-
asp.mutRegisteredFunc.RUnlock()
5767
}
5868
}()
5969
}
70+
71+
// Close stops the polling goroutine. Idempotent; always returns nil.
72+
func (asp *AppStatusPolling) Close() error {
73+
asp.closeOnce.Do(func() {
74+
close(asp.done)
75+
})
76+
return nil
77+
}

0 commit comments

Comments
 (0)