Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ linters:
- nosprintfhostport # checks for misuse of Sprintf to construct a host with port in a URL
- perfsprint # checks that fmt.Sprintf can be replaced with a faster alternative
- predeclared # finds code that shadows one of Go's predeclared identifiers
- promlinter # checks Prometheus metrics naming via promlint
#- promlinter # checks Prometheus metrics naming via promlint
- protogetter # reports direct reads from proto message fields when getters should be used
- reassign # checks that package variables are not reassigned
- revive # fast, configurable, extensible, flexible, and beautiful linter for Go, drop-in replacement of golint
Expand Down Expand Up @@ -373,4 +373,4 @@ issues:
- thelper
exclude-files:
- "pkg/cometbft/cli/.*\\.go"
- "pkg/cometbft/service/server/.*\\.go"
- "pkg/cometbft/service/server/.*\\.go"
6 changes: 2 additions & 4 deletions beacon/blockchain/blob_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type blobFetcher struct {
queue *blobQueue // Queue for persistent requests
executor *blobFetchExecutor // Executor for fetch logic
config BlobFetcherConfig // Configuration
metrics *blobFetcherMetrics
metrics *BlobFetcherMetrics

// We need to track current head slot so we know when blob download requests need to be pruned as they are outside the WithinDAPeriod
headSlotMu sync.RWMutex
Expand All @@ -80,10 +80,8 @@ func NewBlobFetcher(
storageBackend StorageBackend,
chainSpec BlobFetcherChainSpec,
config BlobFetcherConfig,
telemetrySink TelemetrySink,
metrics *BlobFetcherMetrics,
) (BlobFetcher, error) {
metrics := newBlobFetcherMetrics(telemetrySink)

queue, err := newBlobQueue(filepath.Join(dataDir, "blobs", "download_queue"), logger, metrics)
if err != nil {
return nil, err
Expand Down
77 changes: 61 additions & 16 deletions beacon/blockchain/blob_fetcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,89 @@

package blockchain

import (
"github.com/berachain/beacon-kit/observability/metrics"
)

// Metric reason constants for blob fetcher.
const (
expiredReasonOutsideDA = "outside_da_period"
expiredReasonMaxRetries = "max_retries"
)

// blobFetcherMetrics contains metrics for the blob fetcher queue and retry operations.
type blobFetcherMetrics struct {
sink TelemetrySink
// BlobFetcherMetrics contains metrics for the blob fetcher queue and retry operations.
type BlobFetcherMetrics struct {
RetriesTotal metrics.Counter
RequestsExpiredTotal metrics.Counter
RequestsCompletedTotal metrics.Counter
RequestsQueuedTotal metrics.Counter
QueueDepth metrics.Gauge
}

// newBlobFetcherMetrics creates a new blobFetcherMetrics instance.
func newBlobFetcherMetrics(sink TelemetrySink) *blobFetcherMetrics {
return &blobFetcherMetrics{sink: sink}
// NewBlobFetcherMetrics returns a new BlobFetcherMetrics instance with metrics from the provided factory.
// Metric names are kept identical to cosmos-sdk/telemetry output for Grafana compatibility.
func NewBlobFetcherMetrics(factory metrics.Factory) *BlobFetcherMetrics {
return &BlobFetcherMetrics{
RetriesTotal: factory.NewCounter(
metrics.CounterOpts{
Name: "beacon_kit_blob_fetcher_retries_total",
Help: "Number of times a blob request was retried after failure",
},
nil,
),
RequestsExpiredTotal: factory.NewCounter(
metrics.CounterOpts{
Name: "beacon_kit_blob_fetcher_requests_expired_total",
Help: "Number of blob fetch requests that expired before completion",
},
[]string{"reason"},
),
RequestsCompletedTotal: factory.NewCounter(
metrics.CounterOpts{
Name: "beacon_kit_blob_fetcher_requests_completed_total",
Help: "Number of blob fetch requests that completed successfully",
},
nil,
),
RequestsQueuedTotal: factory.NewCounter(
metrics.CounterOpts{
Name: "beacon_kit_blob_fetcher_requests_queued_total",
Help: "Number of new blob fetch requests added to the queue",
},
nil,
),
QueueDepth: factory.NewGauge(
metrics.GaugeOpts{
Name: "beacon_kit_blob_fetcher_queue_depth",
Help: "Current depth of the blob fetcher queue",
},
nil,
),
}
}

// recordRetry increments counter when a blob request is retried after failure.
func (m *blobFetcherMetrics) recordRetry() {
m.sink.IncrementCounter("beacon_kit.blob_fetcher.retries_total")
func (m *BlobFetcherMetrics) recordRetry() {
m.RetriesTotal.Add(1)
}

// recordRequestExpired increments counter when request expires before completion.
// Reason: "outside_da_period", "max_retries"
func (m *blobFetcherMetrics) recordRequestExpired(reason string) {
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_expired_total", "reason", reason)
func (m *BlobFetcherMetrics) recordRequestExpired(reason string) {
m.RequestsExpiredTotal.With("reason", reason).Add(1)
}

// recordRequestComplete increments counter when request completes successfully.
func (m *blobFetcherMetrics) recordRequestComplete() {
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_completed_total")
func (m *BlobFetcherMetrics) recordRequestComplete() {
m.RequestsCompletedTotal.Add(1)
}

// recordRequestQueued increments counter when a new request is added to queue.
func (m *blobFetcherMetrics) recordRequestQueued() {
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_queued_total")
func (m *BlobFetcherMetrics) recordRequestQueued() {
m.RequestsQueuedTotal.Add(1)
}

// setQueueDepth sets the current depth of the blob fetcher queue.
func (m *blobFetcherMetrics) setQueueDepth(depth int) {
m.sink.SetGauge("beacon_kit.blob_fetcher.queue_depth", int64(depth))
func (m *BlobFetcherMetrics) setQueueDepth(depth int) {
m.QueueDepth.Set(float64(depth))
}
4 changes: 2 additions & 2 deletions beacon/blockchain/blob_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type BlobFetchRequest struct {
type blobQueue struct {
queueDir string
logger log.Logger
metrics *blobFetcherMetrics
metrics *BlobFetcherMetrics
}

// newBlobQueue creates a new blob queue with the given directory.
// It creates the directory if it doesn't exist and cleans up orphaned temp files.
func newBlobQueue(queueDir string, logger log.Logger, metrics *blobFetcherMetrics) (*blobQueue, error) {
func newBlobQueue(queueDir string, logger log.Logger, metrics *BlobFetcherMetrics) (*blobQueue, error) {
// Create queue directory
if err := os.MkdirAll(queueDir, 0750); err != nil {
return nil, fmt.Errorf("failed to create blob download queue directory: %w", err)
Expand Down
18 changes: 9 additions & 9 deletions beacon/blockchain/blob_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"cosmossdk.io/log"
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
"github.com/berachain/beacon-kit/node-core/components/metrics"
"github.com/berachain/beacon-kit/observability/metrics/discard"
"github.com/berachain/beacon-kit/primitives/common"
"github.com/berachain/beacon-kit/primitives/eip4844"
"github.com/berachain/beacon-kit/primitives/math"
Expand All @@ -47,7 +47,7 @@ func createTestBlobRequest(slot math.Slot, blobCount int) BlobFetchRequest {
func TestBlobQueue_SuccessfulWrite(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

slot := math.Slot(100)
Expand All @@ -71,7 +71,7 @@ func TestBlobQueue_SuccessfulWrite(t *testing.T) {
func TestBlobQueue_RetryLogic(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

withinDA := func(_, _ math.Slot) bool { return true }
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestBlobQueue_RetryLogic(t *testing.T) {
func TestBlobQueue_AvailabilityWindow(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

// Add old request
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestBlobQueue_AvailabilityWindow(t *testing.T) {
func TestBlobQueue_UpdateRetry(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

request := createTestBlobRequest(math.Slot(100), 2)
Expand All @@ -157,7 +157,7 @@ func TestBlobQueue_UpdateRetry(t *testing.T) {
func TestBlobQueue_ProcessingOrder(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

withinDA := func(_, _ math.Slot) bool { return true }
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBlobQueue_ProcessingOrder(t *testing.T) {
func TestBlobQueue_MaxRetryLimit(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

withinDA := func(_, _ math.Slot) bool { return true }
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestBlobQueue_MaxRetryLimit(t *testing.T) {
func TestBlobQueue_UnderRetryLimit(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

withinDA := func(_, _ math.Slot) bool { return true }
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestBlobQueue_UnderRetryLimit(t *testing.T) {
func TestBlobQueue_CorruptedFileHandling(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
require.NoError(t, err)

withinDA := func(_, _ math.Slot) bool { return true }
Expand Down
12 changes: 2 additions & 10 deletions beacon/blockchain/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func (s *Service) fetchAndStoreDeposits(
deposits, err := s.depositContract.ReadDeposits(ctx, blockNum, blockNum)
if err != nil {
s.logger.Error("Failed to read deposits", "error", err)
s.metrics.sink.IncrementCounter(
"beacon_kit.execution.deposit.failed_to_get_block_logs",
"block_num",
blockNumStr,
)
s.metrics.FailedToGetBlockLogs.With("block_num", blockNumStr).Add(1)
s.failedBlocksMu.Lock()
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocksMu.Unlock()
Expand All @@ -79,11 +75,7 @@ func (s *Service) fetchAndStoreDeposits(

if err = s.storageBackend.DepositStore().EnqueueDeposits(ctx, deposits); err != nil {
s.logger.Error("Failed to store deposits", "error", err)
s.metrics.sink.IncrementCounter(
"beacon_kit.execution.deposit.failed_to_enqueue_deposits",
"block_num",
blockNumStr,
)
s.metrics.FailedToEnqueueDeposits.With("block_num", blockNumStr).Add(1)
s.failedBlocksMu.Lock()
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocksMu.Unlock()
Expand Down
15 changes: 0 additions & 15 deletions beacon/blockchain/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package blockchain

import (
"context"
"time"

"github.com/berachain/beacon-kit/chain"
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
Expand Down Expand Up @@ -133,20 +132,6 @@ type StorageBackend interface {
BlockStore() *block.KVStore[*ctypes.BeaconBlock]
}

// TelemetrySink is an interface for sending metrics to a telemetry backend.
type TelemetrySink interface {
// IncrementCounter increments the counter identified by
// the provided key.
IncrementCounter(key string, args ...string)

// SetGauge sets a gauge metric to the specified value.
SetGauge(key string, value int64, args ...string)

// MeasureSince measures the time since the provided start time,
// identified by the provided keys.
MeasureSince(key string, start time.Time, args ...string)
}

//nolint:revive // its ok
type BlockchainI interface {
ProcessGenesisData(
Expand Down
Loading
Loading