Skip to content
55 changes: 53 additions & 2 deletions beacon/preconf/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,31 @@ type PayloadProvider interface {
GetPayloadBySlot(ctx context.Context, slot math.Slot, parentBlockRoot common.Root) (ctypes.BuiltExecutionPayloadEnv, error)
}

// SyncChecker exposes the node's sync status for health checks.
// Keeps the original signatures from the cometbft service interface.
type SyncChecker interface {
// IsAppReady returns nil if the chain is ready (at least one block has been committed).
// In case of error we set the server as not available.
IsAppReady() error
// GetSyncData returns the latest committed height and the target height being synced to.
GetSyncData() (latestHeight int64, syncToHeight int64)
}

// ELChecker exposes the execution-layer client's connectivity status.
type ELChecker interface {
// IsConnected returns true if the execution client is reachable.
IsConnected() bool
}

// Server is the preconf API server that serves GetPayload requests from validators.
type Server struct {
logger log.Logger
validatorJWTs ValidatorJWTs
whitelist Whitelist
preconfProposerTracker ProposerTracker
payloadProvider PayloadProvider
syncChecker SyncChecker
elChecker ELChecker
port int
metrics *serverMetrics

Expand All @@ -82,6 +100,8 @@ func NewServer(
whitelist Whitelist,
preconfProposerTracker ProposerTracker,
payloadProvider PayloadProvider,
syncChecker SyncChecker,
elChecker ELChecker,
port int,
sink TelemetrySink,
) *Server {
Expand All @@ -91,6 +111,8 @@ func NewServer(
whitelist: whitelist,
preconfProposerTracker: preconfProposerTracker,
payloadProvider: payloadProvider,
syncChecker: syncChecker,
elChecker: elChecker,
port: port,
metrics: newServerMetrics(sink),
}
Expand Down Expand Up @@ -162,13 +184,42 @@ func (s *Server) Stop() error {
return server.Shutdown(ctx)
}

// handleHealth just sends 200 OK to the health check endpoint.
// handleHealth checks sync status and returns 200 when the sequencer is synced
// and ready to produce blocks, or 503 when it is not.
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
s.writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
w.WriteHeader(http.StatusOK)

resp := s.buildHealthResponse()

w.Header().Set("Content-Type", "application/json")
healthy := resp.IsReady && !resp.IsSyncing && resp.ELConnected
if !healthy {
w.WriteHeader(http.StatusServiceUnavailable)
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
s.logger.Error("Failed to encode health response", "error", err)
}
}

// buildHealthResponse inspects the node's sync state and EL connectivity
// and produces a HealthResponse.
func (s *Server) buildHealthResponse() *HealthResponse {
resp := new(HealthResponse)

if s.syncChecker != nil {
resp.IsReady = s.syncChecker.IsAppReady() == nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsAppReady should return a boolean (in general any func thats named Is<something> should return bool) since the error value is never used.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's a bit weird but this is just the ConsensusService relaying the comet interface. I would avoid to wrap it with an extra layer

latestHeight, syncToHeight := s.syncChecker.GetSyncData()
resp.IsSyncing = syncToHeight > latestHeight
}

if s.elChecker != nil {
resp.ELConnected = s.elChecker.IsConnected()
}
Comment thread
bar-bera marked this conversation as resolved.

return resp
}

// handleGetPayload handles the GetPayload endpoint.
Expand Down
128 changes: 126 additions & 2 deletions beacon/preconf/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func TestServer_HandleGetPayload(t *testing.T) {
newTestWhitelist(t, pubkeyAHex, pubkeyBHex),
tracker,
provider,
&mockSyncChecker{ready: true},
&mockELChecker{connected: true},
0,
metrics.NewNoOpTelemetrySink(),
)
Expand Down Expand Up @@ -202,6 +204,8 @@ func TestServer_ProposerCheck(t *testing.T) {
newTestWhitelist(t, pubkeyAHex, pubkeyBHex),
tt.setupTracker(),
&mockPayloadProvider{hasPayload: true},
&mockSyncChecker{ready: true},
&mockELChecker{connected: true},
0,
metrics.NewNoOpTelemetrySink(),
)
Expand All @@ -225,7 +229,7 @@ func TestServer_ProposerCheck(t *testing.T) {
func TestServer_RejectsNonPostMethods(t *testing.T) {
t.Parallel()

server := preconf.NewServer(noop.NewLogger[any](), nil, nil, nil, nil, 0, metrics.NewNoOpTelemetrySink())
server := preconf.NewServer(noop.NewLogger[any](), nil, nil, nil, nil, nil, nil, 0, metrics.NewNoOpTelemetrySink())

for _, method := range []string{http.MethodGet, http.MethodPut, http.MethodDelete} {
req := httptest.NewRequest(method, preconf.PayloadEndpoint, nil)
Expand Down Expand Up @@ -268,7 +272,7 @@ func TestServer_OnSIGHUP(t *testing.T) {
wl, err := preconf.NewWhitelist(tmpFile)
require.NoError(t, err)

server := preconf.NewServer(noop.NewLogger[any](), nil, wl, nil, nil, 0, metrics.NewNoOpTelemetrySink())
server := preconf.NewServer(noop.NewLogger[any](), nil, wl, nil, nil, nil, nil, 0, metrics.NewNoOpTelemetrySink())

require.True(t, wl.IsWhitelisted(pkA))
require.False(t, wl.IsWhitelisted(pkB))
Expand Down Expand Up @@ -411,6 +415,8 @@ func TestServer_MetricsLabels(t *testing.T) {
newTestWhitelist(t, pubkeyAHex, pubkeyBHex),
tracker,
&mockPayloadProvider{hasPayload: tt.hasPayload, returnErr: tt.providerErr},
&mockSyncChecker{ready: true},
&mockELChecker{connected: true},
0,
sink,
)
Expand All @@ -433,3 +439,121 @@ func TestServer_MetricsLabels(t *testing.T) {
})
}
}

// mockSyncChecker implements preconf.SyncChecker for tests.
type mockSyncChecker struct {
ready bool
latestHeight int64
syncToHeight int64
}

func (m *mockSyncChecker) IsAppReady() error {
if !m.ready {
return errors.New("app not ready")
}
return nil
}

func (m *mockSyncChecker) GetSyncData() (int64, int64) {
return m.latestHeight, m.syncToHeight
}

// mockELChecker implements preconf.ELChecker for tests.
type mockELChecker struct {
connected bool
}

func (m *mockELChecker) IsConnected() bool {
return m.connected
}

func TestServer_HealthEndpoint(t *testing.T) {
t.Parallel()

tests := []struct {
name string
syncChecker *mockSyncChecker
elChecker *mockELChecker
wantStatus int
wantReady bool
wantSync bool
wantELConn bool
}{
{
name: "healthy - synced, ready, EL connected",
syncChecker: &mockSyncChecker{ready: true, latestHeight: 100, syncToHeight: 100},
elChecker: &mockELChecker{connected: true},
wantStatus: http.StatusOK,
wantReady: true,
wantSync: false,
wantELConn: true,
},
{
name: "unhealthy - still syncing",
syncChecker: &mockSyncChecker{ready: true, latestHeight: 50, syncToHeight: 100},
elChecker: &mockELChecker{connected: true},
wantStatus: http.StatusServiceUnavailable,
wantReady: true,
wantSync: true,
wantELConn: true,
},
{
name: "unhealthy - app not ready",
syncChecker: &mockSyncChecker{ready: false, latestHeight: 0, syncToHeight: 0},
elChecker: &mockELChecker{connected: true},
wantStatus: http.StatusServiceUnavailable,
wantReady: false,
wantSync: false,
wantELConn: true,
},
{
name: "unhealthy - EL disconnected",
syncChecker: &mockSyncChecker{ready: true, latestHeight: 100, syncToHeight: 100},
elChecker: &mockELChecker{connected: false},
wantStatus: http.StatusServiceUnavailable,
wantReady: true,
wantSync: false,
wantELConn: false,
},
{
name: "unhealthy - nil checkers",
wantStatus: http.StatusServiceUnavailable,
wantReady: false,
wantSync: false,
wantELConn: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

var syncChecker preconf.SyncChecker
if tt.syncChecker != nil {
syncChecker = tt.syncChecker
}
var elChecker preconf.ELChecker
if tt.elChecker != nil {
elChecker = tt.elChecker
}

server := preconf.NewServer(
noop.NewLogger[any](), nil, nil, nil, nil,
syncChecker, elChecker, 0, metrics.NewNoOpTelemetrySink(),
)

req := httptest.NewRequest(http.MethodGet, preconf.HealthEndpoint, nil)
rec := httptest.NewRecorder()
server.Handler().ServeHTTP(rec, req)

require.Equal(t, tt.wantStatus, rec.Code)

var resp preconf.HealthResponse
err := json.NewDecoder(rec.Body).Decode(&resp)
require.NoError(t, err)
require.Equal(t, tt.wantReady, resp.IsReady)
require.Equal(t, tt.wantSync, resp.IsSyncing)
require.Equal(t, tt.wantELConn, resp.ELConnected)
})
Comment thread
bar-bera marked this conversation as resolved.
}
}
14 changes: 14 additions & 0 deletions beacon/preconf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ func NewGetPayloadResponseFromEnvelope(env ctypes.BuiltExecutionPayloadEnv) *Get
}
}

// HealthResponse is the response body for the health endpoint.
// For richer sync metadata (head slot, sync distance), see the
// standard /eth/v1/node/syncing endpoint.
type HealthResponse struct {
// IsReady indicates whether the node has committed at least one block.
IsReady bool `json:"is_ready"`

// IsSyncing indicates whether the node is still catching up with the chain.
IsSyncing bool `json:"is_syncing"`

// ELConnected indicates whether the execution-layer client is reachable.
ELConnected bool `json:"el_connected"`
}

// ErrorResponse is the error response body.
type ErrorResponse struct {
// Code is the error code.
Expand Down
23 changes: 9 additions & 14 deletions execution/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/berachain/beacon-kit/errors"
Expand All @@ -49,10 +49,9 @@ type EngineClient struct {
metrics *clientMetrics
// capabilities is a map of capabilities that the execution client has.
capabilities map[string]struct{}
// connected will be set to true when we have successfully connected
// to the execution client.
connectedMu sync.RWMutex
connected bool
// connected reflects live EL reachability: flipped to true on any
// successful Engine API call, and to false on connection-level errors.
connected atomic.Bool
}

// New creates a new engine client EngineClient.
Expand Down Expand Up @@ -99,7 +98,6 @@ func New(
capabilities: make(map[string]struct{}),
eth1ChainID: eth1ChainID,
metrics: newClientMetrics(telemetrySink, logger),
connected: false,
}
}

Expand All @@ -123,9 +121,10 @@ func (s *EngineClient) Start(ctx context.Context) error {
"dial_url", s.cfg.RPCDialURL.String(),
)

// If the connection connection succeeds, we can skip the
// connection initialization loop.
// If the connection succeeds, we can skip the connection
// initialization loop.
if err := s.verifyChainIDAndConnection(ctx); err == nil {
s.connected.Store(true)
return nil
}

Expand All @@ -147,9 +146,7 @@ func (s *EngineClient) Start(ctx context.Context) error {
}
continue
}
s.connectedMu.Lock()
s.connected = true
s.connectedMu.Unlock()
s.connected.Store(true)
return nil
}
}
Expand All @@ -160,9 +157,7 @@ func (s *EngineClient) Stop() error {
}

func (s *EngineClient) IsConnected() bool {
s.connectedMu.RLock()
defer s.connectedMu.RUnlock()
return s.connected
return s.connected.Load()
}

func (s *EngineClient) HasCapability(capability string) bool {
Expand Down
3 changes: 3 additions & 0 deletions execution/client/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *EngineClient) NewPayload(
}
return nil, s.handleRPCError(err)
}
s.connected.Store(true)
if result == nil {
return nil, engineerrors.ErrNilPayloadStatus
}
Expand Down Expand Up @@ -107,6 +108,7 @@ func (s *EngineClient) ForkchoiceUpdated(
}
return nil, s.handleRPCError(err)
}
s.connected.Store(true)
if result == nil {
return nil, engineerrors.ErrNilForkchoiceResponse
}
Expand Down Expand Up @@ -144,6 +146,7 @@ func (s *EngineClient) GetPayload(
}
return result, s.handleRPCError(err)
}
s.connected.Store(true)
if result == nil {
// Engine API returns the Unknown Payload (-38001) error if a nil result is returned.
return result, engineerrors.ErrUnknownPayload
Expand Down
3 changes: 3 additions & 0 deletions execution/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (s *EngineClient) handleRPCError(
var e jsonrpc.Error
ok := errors.As(err, &e)
if !ok || e == nil {
// No JSON-RPC response at all — the EL is unreachable. Mark the
// client as disconnected so the preconf health endpoint reflects it.
s.connected.Store(false)
return errors.Join(ErrBadConnection, err)
}

Expand Down
Loading
Loading