Skip to content

Commit 819b015

Browse files
committed
fix: health endpoint now detects stopped block production
This commit fixes issue #2643 where the health endpoint still reports OK when a node has stopped producing blocks. Changes: - Updated HealthServer to accept store, config, and logger dependencies - Implemented block production monitoring in the Livez endpoint: * For aggregator nodes, checks if LastBlockTime is recent * Returns WARN if block production is slow (> 3x block time) * Returns FAIL if block production has stopped (> 5x block time) * Uses LazyBlockInterval for lazy mode aggregators * Non-aggregator nodes continue to return PASS - Added constants for health check thresholds: * healthCheckWarnMultiplier = 3 * healthCheckFailMultiplier = 5 - Added comprehensive unit tests covering all scenarios: Server tests (pkg/rpc/server/server_test.go): * Non-aggregator nodes * Aggregator with no blocks * Aggregator with recent blocks (PASS) * Aggregator with slow production (WARN) * Aggregator with stopped production (FAIL) * Lazy aggregator with correct thresholds * Error handling Client tests (pkg/rpc/client/client_test.go): * Non-aggregator returns PASS * Aggregator with recent blocks returns PASS * Aggregator with slow block production returns WARN * Aggregator with stopped block production returns FAIL - Updated setupTestServer to pass new dependencies - Added createCustomTestServer helper for testing with custom configs The thresholds are configurable based on the node's BlockTime or LazyBlockInterval settings, making the health check adaptive to different node configurations. Fixes #2643
1 parent 31859a7 commit 819b015

File tree

5 files changed

+389
-26
lines changed

5 files changed

+389
-26
lines changed

node/helpers_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
evconfig "github.com/evstack/ev-node/pkg/config"
2525
"github.com/evstack/ev-node/pkg/p2p"
2626
"github.com/evstack/ev-node/pkg/p2p/key"
27+
rpcclient "github.com/evstack/ev-node/pkg/rpc/client"
2728
remote_signer "github.com/evstack/ev-node/pkg/signer/noop"
2829
"github.com/evstack/ev-node/types"
2930
)
@@ -317,3 +318,8 @@ func verifyNodesSynced(node1, syncingNode Node, source Source) error {
317318
return fmt.Errorf("nodes not synced: sequencer at height %v, syncing node at height %v", sequencerHeight, syncingHeight)
318319
})
319320
}
321+
322+
// NewRPCClient creates a new RPC client for testing
323+
func NewRPCClient(address string) *rpcclient.Client {
324+
return rpcclient.NewClient("http://" + address)
325+
}

node/single_sequencer_integration_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,70 @@ func waitForBlockN(t *testing.T, n uint64, node *FullNode, blockInterval time.Du
418418
return got >= n
419419
}, timeout[0], blockInterval/2)
420420
}
421+
// TestHealthEndpointWhenBlockProductionStops verifies that the health endpoint
422+
// correctly reports WARN and FAIL states when an aggregator stops producing blocks.
423+
func TestHealthEndpointWhenBlockProductionStops(t *testing.T) {
424+
require := require.New(t)
425+
426+
// Set up configuration with specific block time for predictable health checks
427+
config := getTestConfig(t, 1)
428+
config.Node.Aggregator = true
429+
config.Node.BlockTime = evconfig.DurationWrapper{Duration: 500 * time.Millisecond}
430+
config.Node.MaxPendingHeadersAndData = 2
431+
432+
// Set DA block time large enough to avoid header submission to DA layer
433+
// This will cause block production to stop once MaxPendingHeadersAndData is reached
434+
config.DA.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Second}
435+
436+
node, cleanup := createNodeWithCleanup(t, config)
437+
defer cleanup()
438+
439+
ctx, cancel := context.WithCancel(context.Background())
440+
defer cancel()
441+
442+
var runningWg sync.WaitGroup
443+
startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil)
444+
445+
// Wait for first block to be produced
446+
waitForBlockN(t, 1, node, config.Node.BlockTime.Duration)
447+
448+
// Create RPC client
449+
rpcClient := NewRPCClient(config.RPC.Address)
450+
451+
// Verify health is PASS while blocks are being produced
452+
health, err := rpcClient.GetHealth(ctx)
453+
require.NoError(err)
454+
require.Equal("PASS", health.String(), "Health should be PASS while producing blocks")
455+
456+
// Wait for block production to stop (when MaxPendingHeadersAndData is reached)
457+
time.Sleep(time.Duration(config.Node.MaxPendingHeadersAndData+2) * config.Node.BlockTime.Duration)
458+
459+
// Get the height to confirm blocks stopped
460+
height, err := getNodeHeight(node, Store)
461+
require.NoError(err)
462+
require.LessOrEqual(height, config.Node.MaxPendingHeadersAndData)
463+
464+
// Health check threshold calculations:
465+
// blockTime = 500ms
466+
// warnThreshold = blockTime * 3 = 1500ms = 1.5s
467+
// failThreshold = blockTime * 5 = 2500ms = 2.5s
468+
469+
// Wait for WARN threshold (3x block time = 1.5 seconds after last block)
470+
// We need to wait a bit longer to account for the time blocks take to stop
471+
time.Sleep(1700 * time.Millisecond)
472+
473+
health, err = rpcClient.GetHealth(ctx)
474+
require.NoError(err)
475+
// Health could be WARN or FAIL depending on exact timing, but should not be PASS
476+
require.NotEqual("PASS", health.String(), "Health should not be PASS after block production stops")
477+
478+
// Wait for FAIL threshold (5x block time = 2.5 seconds total after last block)
479+
time.Sleep(1500 * time.Millisecond)
480+
481+
health, err = rpcClient.GetHealth(ctx)
482+
require.NoError(err)
483+
require.Equal("FAIL", health.String(), "Health should be FAIL after 5x block time without new blocks")
484+
485+
// Stop the node and wait for shutdown
486+
shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 10*time.Second)
487+
}

pkg/rpc/client/client_test.go

Lines changed: 118 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ func setupTestServer(t *testing.T, mockStore *mocks.MockStore, mockP2P *mocks.Mo
3030

3131
// Create the servers
3232
logger := zerolog.Nop()
33-
storeServer := server.NewStoreServer(mockStore, logger)
34-
p2pServer := server.NewP2PServer(mockP2P)
35-
healthServer := server.NewHealthServer()
36-
37-
// Create config server with test config
3833
testConfig := config.DefaultConfig()
3934
testConfig.DA.Namespace = "test-headers"
35+
36+
storeServer := server.NewStoreServer(mockStore, logger)
37+
p2pServer := server.NewP2PServer(mockP2P)
38+
healthServer := server.NewHealthServer(mockStore, testConfig, logger)
4039
configServer := server.NewConfigServer(testConfig, nil, logger)
4140

4241
// Register the store service
@@ -242,21 +241,123 @@ func TestClientGetNetInfo(t *testing.T) {
242241
}
243242

244243
func TestClientGetHealth(t *testing.T) {
245-
// Create mocks
246-
mockStore := mocks.NewMockStore(t)
247-
mockP2P := mocks.NewMockP2PRPC(t)
244+
t.Run("non-aggregator returns PASS", func(t *testing.T) {
245+
mockStore := mocks.NewMockStore(t)
246+
mockP2P := mocks.NewMockP2PRPC(t)
247+
248+
testServer, client := setupTestServer(t, mockStore, mockP2P)
249+
defer testServer.Close()
250+
251+
healthStatus, err := client.GetHealth(context.Background())
252+
253+
require.NoError(t, err)
254+
require.Equal(t, "PASS", healthStatus.String())
255+
})
256+
257+
t.Run("aggregator with recent blocks returns PASS", func(t *testing.T) {
258+
mockStore := mocks.NewMockStore(t)
259+
mockP2P := mocks.NewMockP2PRPC(t)
260+
261+
// Setup aggregator config
262+
testConfig := config.DefaultConfig()
263+
testConfig.Node.Aggregator = true
264+
testConfig.Node.BlockTime.Duration = 1 * time.Second
265+
266+
// Create state with recent block
267+
state := types.State{
268+
LastBlockHeight: 100,
269+
LastBlockTime: time.Now().Add(-500 * time.Millisecond),
270+
}
271+
mockStore.On("GetState", mock.Anything).Return(state, nil)
272+
273+
// Create custom test server with aggregator config
274+
testServer := createCustomTestServer(t, mockStore, mockP2P, testConfig)
275+
defer testServer.Close()
276+
277+
client := NewClient(testServer.URL)
278+
healthStatus, err := client.GetHealth(context.Background())
279+
280+
require.NoError(t, err)
281+
require.Equal(t, "PASS", healthStatus.String())
282+
mockStore.AssertExpectations(t)
283+
})
284+
285+
t.Run("aggregator with slow block production returns WARN", func(t *testing.T) {
286+
mockStore := mocks.NewMockStore(t)
287+
mockP2P := mocks.NewMockP2PRPC(t)
288+
289+
testConfig := config.DefaultConfig()
290+
testConfig.Node.Aggregator = true
291+
testConfig.Node.BlockTime.Duration = 1 * time.Second
292+
293+
// State with block older than 3x block time
294+
state := types.State{
295+
LastBlockHeight: 100,
296+
LastBlockTime: time.Now().Add(-4 * time.Second),
297+
}
298+
mockStore.On("GetState", mock.Anything).Return(state, nil)
299+
300+
testServer := createCustomTestServer(t, mockStore, mockP2P, testConfig)
301+
defer testServer.Close()
302+
303+
client := NewClient(testServer.URL)
304+
healthStatus, err := client.GetHealth(context.Background())
305+
306+
require.NoError(t, err)
307+
require.Equal(t, "WARN", healthStatus.String())
308+
mockStore.AssertExpectations(t)
309+
})
310+
311+
t.Run("aggregator with stopped block production returns FAIL", func(t *testing.T) {
312+
mockStore := mocks.NewMockStore(t)
313+
mockP2P := mocks.NewMockP2PRPC(t)
314+
315+
testConfig := config.DefaultConfig()
316+
testConfig.Node.Aggregator = true
317+
testConfig.Node.BlockTime.Duration = 1 * time.Second
318+
319+
// State with block older than 5x block time
320+
state := types.State{
321+
LastBlockHeight: 100,
322+
LastBlockTime: time.Now().Add(-10 * time.Second),
323+
}
324+
mockStore.On("GetState", mock.Anything).Return(state, nil)
325+
326+
testServer := createCustomTestServer(t, mockStore, mockP2P, testConfig)
327+
defer testServer.Close()
328+
329+
client := NewClient(testServer.URL)
330+
healthStatus, err := client.GetHealth(context.Background())
331+
332+
require.NoError(t, err)
333+
require.Equal(t, "FAIL", healthStatus.String())
334+
mockStore.AssertExpectations(t)
335+
})
336+
}
248337

249-
// Setup test server and client
250-
testServer, client := setupTestServer(t, mockStore, mockP2P)
251-
defer testServer.Close()
338+
// createCustomTestServer creates a test server with custom configuration
339+
func createCustomTestServer(t *testing.T, mockStore *mocks.MockStore, mockP2P *mocks.MockP2PRPC, testConfig config.Config) *httptest.Server {
340+
mux := http.NewServeMux()
341+
342+
logger := zerolog.Nop()
343+
storeServer := server.NewStoreServer(mockStore, logger)
344+
p2pServer := server.NewP2PServer(mockP2P)
345+
healthServer := server.NewHealthServer(mockStore, testConfig, logger)
346+
configServer := server.NewConfigServer(testConfig, nil, logger)
252347

253-
// Call GetHealth
254-
healthStatus, err := client.GetHealth(context.Background())
348+
storePath, storeHandler := rpc.NewStoreServiceHandler(storeServer)
349+
mux.Handle(storePath, storeHandler)
255350

256-
// Assert expectations
257-
require.NoError(t, err)
258-
// Health server always returns PASS in Livez
259-
require.NotEqual(t, healthStatus.String(), "UNKNOWN")
351+
p2pPath, p2pHandler := rpc.NewP2PServiceHandler(p2pServer)
352+
mux.Handle(p2pPath, p2pHandler)
353+
354+
healthPath, healthHandler := rpc.NewHealthServiceHandler(healthServer)
355+
mux.Handle(healthPath, healthHandler)
356+
357+
configPath, configHandler := rpc.NewConfigServiceHandler(configServer)
358+
mux.Handle(configPath, configHandler)
359+
360+
return httptest.NewServer(h2c.NewHandler(mux, &http2.Server{}))
260361
}
261362

262363
func TestClientGetNamespace(t *testing.T) {

pkg/rpc/server/server.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ import (
2828
rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect"
2929
)
3030

31+
const (
32+
// healthCheckWarnMultiplier is the multiplier for block time to determine WARN threshold
33+
// If no block has been produced in (blockTime * healthCheckWarnMultiplier), return WARN
34+
healthCheckWarnMultiplier = 3
35+
36+
// healthCheckFailMultiplier is the multiplier for block time to determine FAIL threshold
37+
// If no block has been produced in (blockTime * healthCheckFailMultiplier), return FAIL
38+
healthCheckFailMultiplier = 5
39+
)
40+
3141
var _ rpc.StoreServiceHandler = (*StoreServer)(nil)
3242

3343
// StoreServer implements the StoreService defined in the proto file
@@ -288,19 +298,76 @@ func (p *P2PServer) GetNetInfo(
288298
}
289299

290300
// HealthServer implements the HealthService defined in the proto file
291-
type HealthServer struct{}
301+
type HealthServer struct {
302+
store store.Store
303+
config config.Config
304+
logger zerolog.Logger
305+
}
292306

293307
// NewHealthServer creates a new HealthServer instance
294-
func NewHealthServer() *HealthServer {
295-
return &HealthServer{}
308+
func NewHealthServer(store store.Store, config config.Config, logger zerolog.Logger) *HealthServer {
309+
return &HealthServer{
310+
store: store,
311+
config: config,
312+
logger: logger,
313+
}
296314
}
297315

298316
// Livez implements the HealthService.Livez RPC
299317
func (h *HealthServer) Livez(
300318
ctx context.Context,
301319
req *connect.Request[emptypb.Empty],
302320
) (*connect.Response[pb.GetHealthResponse], error) {
303-
// always return healthy
321+
// For aggregator nodes, check if block production is healthy
322+
if h.config.Node.Aggregator {
323+
state, err := h.store.GetState(ctx)
324+
if err != nil {
325+
h.logger.Error().Err(err).Msg("Failed to get state for health check")
326+
return connect.NewResponse(&pb.GetHealthResponse{
327+
Status: pb.HealthStatus_FAIL,
328+
}), nil
329+
}
330+
331+
// If we have blocks, check if the last block time is recent
332+
if state.LastBlockHeight > 0 {
333+
timeSinceLastBlock := time.Since(state.LastBlockTime)
334+
335+
// Calculate the threshold based on block time
336+
blockTime := h.config.Node.BlockTime.Duration
337+
338+
// For lazy mode, use the lazy block interval instead
339+
if h.config.Node.LazyMode {
340+
blockTime = h.config.Node.LazyBlockInterval.Duration
341+
}
342+
343+
warnThreshold := blockTime * healthCheckWarnMultiplier
344+
failThreshold := blockTime * healthCheckFailMultiplier
345+
346+
if timeSinceLastBlock > failThreshold {
347+
h.logger.Warn().
348+
Dur("time_since_last_block", timeSinceLastBlock).
349+
Dur("fail_threshold", failThreshold).
350+
Uint64("last_block_height", state.LastBlockHeight).
351+
Time("last_block_time", state.LastBlockTime).
352+
Msg("Health check: node has stopped producing blocks (FAIL)")
353+
return connect.NewResponse(&pb.GetHealthResponse{
354+
Status: pb.HealthStatus_FAIL,
355+
}), nil
356+
} else if timeSinceLastBlock > warnThreshold {
357+
h.logger.Warn().
358+
Dur("time_since_last_block", timeSinceLastBlock).
359+
Dur("warn_threshold", warnThreshold).
360+
Uint64("last_block_height", state.LastBlockHeight).
361+
Time("last_block_time", state.LastBlockTime).
362+
Msg("Health check: block production is slow (WARN)")
363+
return connect.NewResponse(&pb.GetHealthResponse{
364+
Status: pb.HealthStatus_WARN,
365+
}), nil
366+
}
367+
}
368+
}
369+
370+
// For non-aggregator nodes or if checks pass, return healthy
304371
return connect.NewResponse(&pb.GetHealthResponse{
305372
Status: pb.HealthStatus_PASS,
306373
}), nil
@@ -310,7 +377,7 @@ func (h *HealthServer) Livez(
310377
func NewServiceHandler(store store.Store, peerManager p2p.P2PRPC, proposerAddress []byte, logger zerolog.Logger, config config.Config, bestKnown BestKnownHeightProvider) (http.Handler, error) {
311378
storeServer := NewStoreServer(store, logger)
312379
p2pServer := NewP2PServer(peerManager)
313-
healthServer := NewHealthServer()
380+
healthServer := NewHealthServer(store, config, logger)
314381
configServer := NewConfigServer(config, proposerAddress, logger)
315382

316383
mux := http.NewServeMux()

0 commit comments

Comments
 (0)