Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metrics/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
FlashblocksReceiveFailureCount otelapi.Int64Counter
FlashblocksReceiveSuccessCount otelapi.Int64Counter
FlashblocksSkipped otelapi.Int64Counter
FlashblocksStreamUp otelapi.Int64Gauge

FlashtestationsLandedCount otelapi.Int64Gauge
FlashtestationsMissedCount otelapi.Int64Gauge
Expand Down Expand Up @@ -70,6 +71,7 @@ var (
setupFlashblocksReceiveFailureCount,
setupFlashblocksReceiveSuccessCount,
setupFlashblocksSkipped,
setupFlashblocksStreamUp,

setupFlashtestationsLandedCount,
setupFlashtestationsMissedCount,
Expand Down
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ func setupFlashblocksSkipped(ctx context.Context, _ *config.ProbeTx) error {
return nil
}

func setupFlashblocksStreamUp(ctx context.Context, _ *config.ProbeTx) error {
m, err := meter.Int64Gauge("flashblocks_stream_up",
otelapi.WithDescription("whether the flashblocks stream is up and connected (1 = up, 0 = down)"),
)
if err != nil {
return err
}
FlashblocksStreamUp = m
return nil
}

func setupFlashtestationsLandedCount(ctx context.Context, _ *config.ProbeTx) error {
m, err := meter.Int64Gauge("flashtestations_landed_count",
otelapi.WithDescription("flashtestations landed by our builder"),
Expand Down
85 changes: 82 additions & 3 deletions server/l2/flashblocks_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ func (fm *FlashblocksMonitor) Run(ctx context.Context) *<-chan *flashblockEvent
processingContext, cancel := context.WithCancel(processingContext)
fm.stop = cancel

fm.initializeMetricsFlashblocksStreams(ctx)

for stream, url := range fm.cfg.privateStreams {
fm.readStream(ctx, stream, url, fm.flashblocksPrivate)
fm.readStream(ctx, stream, url, "private", fm.flashblocksPrivate)
}
for stream, url := range fm.cfg.publicStreams {
fm.readStream(ctx, stream, url, fm.flashblocksPublic)
fm.readStream(ctx, stream, url, "public", fm.flashblocksPublic)
Comment on lines 100 to +109

Copilot AI Feb 10, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code sets fm.stop to cancel processingContext, but initializeMetricsFlashblocksStreams and the stream goroutines are started with ctx instead of processingContext. If fm.stop() is called, processingContext will be canceled but the streams may keep running (and continue recording metrics) because they’re not observing the canceled context. Pass processingContext to initializeMetricsFlashblocksStreams(...) and to readStream(...) so shutdown behaves as intended.

Copilot uses AI. Check for mistakes.
}

flashblocks := make(chan *flashblockEvent, 2*fm.cfg.flashblocksPerBlock)
Expand Down Expand Up @@ -135,7 +137,7 @@ func (fm *FlashblocksMonitor) Observe(_ context.Context, o otelapi.Observer) err

func (fm *FlashblocksMonitor) readStream(
ctx context.Context,
streamID, streamUrl string,
streamID, streamUrl, streamType string,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise:

stream type is useful, thanks

flashblocks chan<- *flashblockEvent,
) {
go func() {
Expand Down Expand Up @@ -167,6 +169,13 @@ func (fm *FlashblocksMonitor) readStream(
metrics.FlashblocksReceiveFailureCount.Add(ctx, 1, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
metrics.FlashblocksStreamUp.Record(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
l.Warn("Failed to connect to flashblocks stream",
Expand All @@ -184,6 +193,14 @@ func (fm *FlashblocksMonitor) readStream(
backoff = wsBackoffMin
conn = _conn
doneReceiving = doneDialling

// connection successful - set stream_up to 1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question:

under which conditions does it go from 1 => 0?

in what way is it better/more reliable than, for example:

increase(chain_monitor_flashblocks_receive_success_count_total{}[1m]) > 0

?

metrics.FlashblocksStreamUp.Record(ctx, 1, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
}

{ // receive
Expand All @@ -194,6 +211,13 @@ func (fm *FlashblocksMonitor) readStream(
metrics.FlashblocksReceiveFailureCount.Add(ctx, 1, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
metrics.FlashblocksStreamUp.Record(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
l.Warn("Failed to read message from flashblocks stream",
Expand All @@ -215,6 +239,7 @@ func (fm *FlashblocksMonitor) readStream(
metrics.FlashblocksReceiveFailureCount.Add(ctx, 1, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
l.Warn("Failed to decompress binary message from flashblocks stream, ignoring...",
Expand All @@ -235,6 +260,7 @@ func (fm *FlashblocksMonitor) readStream(
metrics.FlashblocksReceiveFailureCount.Add(ctx, 1, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(streamID)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue(streamType)},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
l.Error("Failed to parse flashblock",
Expand Down Expand Up @@ -438,3 +464,56 @@ func (fm *FlashblocksMonitor) detectInconsistentFlashblocks(ctx context.Context,
))
}
}

func (fm *FlashblocksMonitor) initializeMetricsFlashblocksStreams(ctx context.Context) {
// initialize stream health metrics to 0 for all streams to ensure
// metrics always exist regardless of builder health
Comment thread
KA-ROM marked this conversation as resolved.
Outdated

// FlashblocksStreamUp gauge for private and public streams
for stream := range fm.cfg.privateStreams {
metrics.FlashblocksStreamUp.Record(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("private")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
}
for stream := range fm.cfg.publicStreams {
metrics.FlashblocksStreamUp.Record(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("public")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
Comment thread
KA-ROM marked this conversation as resolved.
}

// FlashblocksReceiveSuccess/FailureCount counters for private and public streams
for stream := range fm.cfg.privateStreams {
metrics.FlashblocksReceiveSuccessCount.Add(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("private")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
metrics.FlashblocksReceiveFailureCount.Add(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("private")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
}
for stream := range fm.cfg.publicStreams {
metrics.FlashblocksReceiveSuccessCount.Add(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("public")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
metrics.FlashblocksReceiveFailureCount.Add(ctx, 0, otelapi.WithAttributes(
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
attribute.KeyValue{Key: "stream", Value: attribute.StringValue(stream)},
attribute.KeyValue{Key: "stream_type", Value: attribute.StringValue("public")},
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(fm.cfg.networkID)},
))
Comment thread
KA-ROM marked this conversation as resolved.
}
}