Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 70 additions & 17 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error

router.GET("/eth/v2/beacon/blocks/:block_id", h.wrappedHandler(h.handleEthV2BeaconBlocks))

router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedHandler(h.handleEthV2DebugBeaconStates))
// Use streaming handler for states - they can be 100s of MB and we don't want to buffer them
router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedStreamingHandler(h.handleEthV2DebugBeaconStatesStreaming))

router.GET("/checkpointz/v1/status", h.wrappedHandler(h.handleCheckpointzStatus))
router.GET("/checkpointz/v1/beacon/slots", h.wrappedHandler(h.handleCheckpointzBeaconSlots))
Expand Down Expand Up @@ -139,6 +140,43 @@ func (h *Handler) wrappedHandler(handler func(ctx context.Context, r *http.Reque
}
}

// StreamingHandler is a handler that writes directly to the ResponseWriter for large responses.
// This avoids buffering the entire response in memory, which is important for beacon states (~100s MB).
type StreamingHandler func(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error

func (h *Handler) wrappedStreamingHandler(handler StreamingHandler) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
start := time.Now()

contentType := NewContentTypeFromRequest(r)
ctx := r.Context()
registeredPath := deriveRegisteredPath(r, p)

h.log.WithFields(logrus.Fields{
"method": r.Method,
"path": r.URL.Path,
"content_type": contentType,
"accept": r.Header.Get("Accept"),
}).Trace("Handling streaming request")

h.metrics.ObserveRequest(r.Method, registeredPath)

statusCode := http.StatusOK

defer func() {
h.metrics.ObserveResponse(r.Method, registeredPath, fmt.Sprintf("%v", statusCode), contentType.String(), time.Since(start))
}()

if err := handler(ctx, w, r, p, contentType); err != nil {
statusCode = http.StatusInternalServerError

if writeErr := WriteErrorResponse(w, err.Error(), statusCode); writeErr != nil {
h.log.WithError(writeErr).Error("Failed to write error response")
}
}
}
}

func (h *Handler) handleEthV1BeaconGenesis(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil {
return NewUnsupportedMediaTypeResponse(nil), err
Expand Down Expand Up @@ -199,45 +237,60 @@ func (h *Handler) handleEthV2BeaconBlocks(ctx context.Context, r *http.Request,
return rsp, nil
}

func (h *Handler) handleEthV2DebugBeaconStates(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
// handleEthV2DebugBeaconStatesStreaming streams beacon states directly to the ResponseWriter.
// This avoids buffering the entire state (~100s of MB) in memory before writing.
func (h *Handler) handleEthV2DebugBeaconStatesStreaming(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error {
if err := ValidateContentType(contentType, []ContentType{ContentTypeSSZ}); err != nil {
return NewUnsupportedMediaTypeResponse(nil), err
w.WriteHeader(http.StatusNotAcceptable)

return err
}

id, err := eth.NewStateIdentifier(p.ByName("state_id"))
if err != nil {
return NewBadRequestResponse(nil), err
w.WriteHeader(http.StatusBadRequest)

return err
}

state, err := h.eth.BeaconState(ctx, id)
if err != nil {
return NewInternalServerErrorResponse(nil), err
w.WriteHeader(http.StatusInternalServerError)

return err
}

if state == nil {
return NewInternalServerErrorResponse(nil), errors.New("state not found")
w.WriteHeader(http.StatusInternalServerError)

return errors.New("state not found")
}

rsp := NewSuccessResponse(ContentTypeResolvers{
ContentTypeSSZ: func() ([]byte, error) {
return h.sszEncoder.EncodeStateSSZ(state)
},
})
// Set headers before streaming
w.Header().Set("Content-Type", ContentTypeSSZ.String())
w.Header().Set("Eth-Consensus-Version", state.Version.String())

switch id.Type() {
case eth.StateIDSlot:
rsp.SetCacheControl("public, s-max-age=6000")
w.Header().Set("Cache-Control", "public, s-max-age=6000")
case eth.StateIDFinalized:
rsp.SetCacheControl("public, s-max-age=180")
w.Header().Set("Cache-Control", "public, s-max-age=180")
case eth.StateIDRoot:
rsp.SetCacheControl("public, s-max-age=6000")
w.Header().Set("Cache-Control", "public, s-max-age=6000")
case eth.StateIDHead:
rsp.SetCacheControl("public, s-max-age=30")
w.Header().Set("Cache-Control", "public, s-max-age=30")
}

rsp.SetEthConsensusVersion(state.Version.String())
// Get size for Content-Length header (enables efficient HTTP handling)
size, err := h.sszEncoder.StateSizeSSZ(state)
if err == nil {
w.Header().Set("Content-Length", strconv.Itoa(size))
}

return rsp, nil
// Stream the SSZ-encoded state directly to the ResponseWriter
_, err = h.sszEncoder.WriteStateSSZ(ctx, w, state)

return err
}

func (h *Handler) handleEthV1ConfigSpec(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/beacon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type Config struct {
// HistoricalEpochCount determines how many historical epochs the provider will cache.
HistoricalEpochCount int `yaml:"historical_epoch_count" default:"20"`

// SSZEncodingMemoryBudgetBytes limits the total memory used for concurrent SSZ encoding
// operations (e.g., serving beacon states). Set to 0 to disable limiting (default).
// Example: 17179869184 (16GB) allows ~53 concurrent mainnet state encodings.
SSZEncodingMemoryBudgetBytes int64 `yaml:"ssz_encoding_memory_budget_bytes" default:"0"`

// Cache holds configuration for the caches.
Frontend FrontendConfig `yaml:"frontend"`
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
historicalSlotFailures: make(map[phase0.Slot]int),

broker: emission.NewEmitter(),
sszEncoder: ssz.NewEncoder(config.CustomPreset),
sszEncoder: ssz.NewEncoder(config.CustomPreset, config.SSZEncodingMemoryBudgetBytes),
blocks: store.NewBlock(log, config.Caches.Blocks, namespace),
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),
Expand Down Expand Up @@ -750,6 +750,7 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error)

latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(sp.SlotsPerEpoch))

//nolint:gosec // G115: HistoricalEpochCount is a small positive config value, safe to convert
for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(sp.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(sp.SlotsPerEpoch) {
slots = append(slots, phase0.Slot(i))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1
// We'll derive the current finalized slot and then work back in intervals of SLOTS_PER_EPOCH.
currentSlot := uint64(checkpoint.Finalized.Epoch) * uint64(sp.SlotsPerEpoch)
for i := 1; i < d.config.HistoricalEpochCount; i++ {
//nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount
if uint64(i)*uint64(sp.SlotsPerEpoch) > currentSlot {
break
}

//nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount
slot := phase0.Slot(currentSlot - uint64(i)*uint64(sp.SlotsPerEpoch))

slotsInScope[slot] = struct{}{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/beacon/expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ var (
)

func CalculateSlotExpiration(slot phase0.Slot, slotsOfHistory int) phase0.Slot {
//nolint:gosec // G115: slotsOfHistory is a small positive test value
return slot + phase0.Slot(slotsOfHistory)
}

func GetSlotTime(slot phase0.Slot, secondsPerSlot time.Duration, genesis time.Time) time.Time {
//nolint:gosec // G115: slot values are bounded by beacon chain limits, safe for duration
return genesis.Add(time.Duration(slot) * secondsPerSlot)
}

Expand Down
159 changes: 158 additions & 1 deletion pkg/beacon/ssz/encoder.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,54 @@
package ssz

import (
"context"
"encoding/json"
"errors"
"io"
"sync"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/beacon/pkg/beacon/state"
"golang.org/x/sync/semaphore"

dynssz "github.com/pk910/dynamic-ssz"
"github.com/pk910/dynamic-ssz/sszutils"
)

// sszBufferPool provides reusable buffers for SSZ encoding to reduce allocations.
// Beacon states can be 100s of MB, so reusing buffers significantly reduces GC pressure.
var sszBufferPool = sync.Pool{
New: func() any {
// Start with 1MB buffer, will grow as needed
b := make([]byte, 0, 1024*1024)
return &b
},
}

type Encoder struct {
customPreset bool
dynssz *dynssz.DynSsz
spec map[string]any
specMtx sync.Mutex

// memorySem limits total memory used for concurrent SSZ encoding.
// If nil, no limit is applied.
memorySem *semaphore.Weighted
}

func NewEncoder(customPreset bool) *Encoder {
// NewEncoder creates a new SSZ encoder.
// If memoryBudget is > 0, limits concurrent SSZ encoding to that many bytes.
// If memoryBudget is <= 0, no limit is applied.
func NewEncoder(customPreset bool, memoryBudget int64) *Encoder {
var sem *semaphore.Weighted
if memoryBudget > 0 {
sem = semaphore.NewWeighted(memoryBudget)
}

return &Encoder{
customPreset: customPreset,
memorySem: sem,
}
}

Expand Down Expand Up @@ -213,3 +239,134 @@ func (e *Encoder) EncodeStateSSZ(beaconState *spec.VersionedBeaconState) (ssz []

return ssz, nil
}

// WriteStateSSZ encodes the beacon state as SSZ and writes directly to w.
// This uses a pooled buffer to reduce allocations for large states (~100s of MB).
// If the encoder was created with a memory budget, this method will block until
// sufficient memory is available and respects context cancellation.
// The memory budget is released immediately after encoding completes, allowing
// slow client connections to stream without holding the budget hostage.
// Returns the number of bytes written.
func (e *Encoder) WriteStateSSZ(ctx context.Context, w io.Writer, beaconState *spec.VersionedBeaconState) (int64, error) {
var stateObj sszutils.FastsszMarshaler

switch beaconState.Version {
case spec.DataVersionPhase0:
stateObj = beaconState.Phase0
case spec.DataVersionAltair:
stateObj = beaconState.Altair
case spec.DataVersionBellatrix:
stateObj = beaconState.Bellatrix
case spec.DataVersionCapella:
stateObj = beaconState.Capella
case spec.DataVersionDeneb:
stateObj = beaconState.Deneb
case spec.DataVersionElectra:
stateObj = beaconState.Electra
case spec.DataVersionFulu:
stateObj = beaconState.Fulu
default:
return 0, errors.New("unknown state version")
}

// Get the size upfront - needed for both memory budgeting and buffer allocation
size := stateObj.SizeSSZ()

// If memory budget is configured, acquire memory from the semaphore.
// Note: We release the semaphore after encoding, not after streaming.
// This allows slow clients to receive data without holding the budget.
if e.memorySem != nil {
if err := e.memorySem.Acquire(ctx, int64(size)); err != nil {
return 0, err
}
}

// For custom presets, fall back to regular encoding (dynamic-ssz doesn't support MarshalSSZTo)
if e.customPreset {
data, err := e.getDynamicSSZ().MarshalSSZ(stateObj)

// Release memory budget immediately after encoding (before streaming to client)
if e.memorySem != nil {
e.memorySem.Release(int64(size))
}

if err != nil {
return 0, err
}

n, err := w.Write(data)

return int64(n), err
}

// Acquire a pooled buffer
bufPtr, ok := sszBufferPool.Get().(*[]byte)
if !ok || bufPtr == nil {
// Pool returned unexpected type, allocate fresh buffer
b := make([]byte, 0, size)
bufPtr = &b
}

buf := *bufPtr

// Ensure buffer has enough capacity
if cap(buf) < size {
buf = make([]byte, 0, size)
} else {
buf = buf[:0]
}

// Marshal into the buffer
data, err := stateObj.MarshalSSZTo(buf)

// Release memory budget immediately after encoding (before streaming to client)
// This allows slow clients to receive data without holding the budget hostage.
if e.memorySem != nil {
e.memorySem.Release(int64(size))
}

if err != nil {
// Return buffer to pool even on error
*bufPtr = buf
sszBufferPool.Put(bufPtr)

return 0, err
}

// Write to the output (this can take a long time for slow clients, but we've
// already released the memory budget so other requests can proceed)
n, err := w.Write(data)

// Return buffer to pool
*bufPtr = buf
sszBufferPool.Put(bufPtr)

return int64(n), err
}

// StateSizeSSZ returns the SSZ encoded size of the beacon state without encoding it.
// Useful for setting Content-Length headers before streaming.
func (e *Encoder) StateSizeSSZ(beaconState *spec.VersionedBeaconState) (int, error) {
var stateObj sszutils.FastsszMarshaler

switch beaconState.Version {
case spec.DataVersionPhase0:
stateObj = beaconState.Phase0
case spec.DataVersionAltair:
stateObj = beaconState.Altair
case spec.DataVersionBellatrix:
stateObj = beaconState.Bellatrix
case spec.DataVersionCapella:
stateObj = beaconState.Capella
case spec.DataVersionDeneb:
stateObj = beaconState.Deneb
case spec.DataVersionElectra:
stateObj = beaconState.Electra
case spec.DataVersionFulu:
stateObj = beaconState.Fulu
default:
return 0, errors.New("unknown state version")
}

return stateObj.SizeSSZ(), nil
}
Loading
Loading