Skip to content

Commit 6933f03

Browse files
derrandzrenaynay
andauthored
feat(node | das | libs/header/sync): add total uptime node metrics + totalSampled das metrics + totalSynced sync metrics (#1638)
## Overview This PR introduces node uptime metrics + das total sampled headers metrics to support calculating the uptime index proposed by mustafa on the monitoring side. This PR introduces a new module named `Telemetry` to support node related telemetry. This module can also host all general telemetry and observability that does not interest specific modules. ## Changes - [x] Introduced uptime metrics for node under `nodebuilder/node/uptime.go` - [x] Introduced persistent uptime metrics using datastore to persist node start time - [x] Testing for uptime metrics persistence using the store - [x] Unit testing for uptime metrics - [x] Integration testing for uptime metrics - [ ] e2e testing for uptime metrics ## Checklist - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [ ] Linked issues closed with keywords ## Blocked By PR: #1537 --------- Co-authored-by: rene <[email protected]>
1 parent abb9fa1 commit 6933f03

File tree

14 files changed

+306
-28
lines changed

14 files changed

+306
-28
lines changed

das/checkpoint.go

+9
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,12 @@ func (c checkpoint) String() string {
4848

4949
return str
5050
}
51+
52+
// totalSampled returns the total amount of sampled headers
53+
func (c checkpoint) totalSampled() uint64 {
54+
var totalInProgress uint64
55+
for _, w := range c.Workers {
56+
totalInProgress += (w.To - w.From) + 1
57+
}
58+
return c.SampleFrom - totalInProgress - uint64(len(c.Failed))
59+
}

das/coordinator.go

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func newSamplingCoordinator(
5555

5656
func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) {
5757
sc.state.resumeFromCheckpoint(cp)
58+
59+
// the amount of sampled headers from the last checkpoint
60+
sc.metrics.recordTotalSampled(cp.totalSampled())
61+
5862
// resume workers
5963
for _, wk := range cp.Workers {
6064
sc.runWorker(ctx, sc.state.newJob(wk.From, wk.To))

das/metrics.go

+52-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ type metrics struct {
2424
sampleTime syncfloat64.Histogram
2525
getHeaderTime syncfloat64.Histogram
2626
newHead syncint64.Counter
27-
lastSampledTS int64
27+
28+
lastSampledTS uint64
29+
totalSampledInt uint64
2830
}
2931

3032
func (d *DASer) InitMetrics() error {
@@ -76,6 +78,16 @@ func (d *DASer) InitMetrics() error {
7678
return err
7779
}
7880

81+
totalSampled, err := meter.
82+
AsyncInt64().
83+
Gauge(
84+
"das_total_sampled_headers",
85+
instrument.WithDescription("total sampled headers gauge"),
86+
)
87+
if err != nil {
88+
return err
89+
}
90+
7991
d.sampler.metrics = &metrics{
8092
sampled: sampled,
8193
sampleTime: sampleTime,
@@ -85,7 +97,11 @@ func (d *DASer) InitMetrics() error {
8597

8698
err = meter.RegisterCallback(
8799
[]instrument.Asynchronous{
88-
lastSampledTS, busyWorkers, networkHead, sampledChainHead,
100+
lastSampledTS,
101+
busyWorkers,
102+
networkHead,
103+
sampledChainHead,
104+
totalSampled,
89105
},
90106
func(ctx context.Context) {
91107
stats, err := d.sampler.stats(ctx)
@@ -97,9 +113,12 @@ func (d *DASer) InitMetrics() error {
97113
networkHead.Observe(ctx, int64(stats.NetworkHead))
98114
sampledChainHead.Observe(ctx, int64(stats.SampledChainHead))
99115

100-
if ts := atomic.LoadInt64(&d.sampler.metrics.lastSampledTS); ts != 0 {
101-
lastSampledTS.Observe(ctx, ts)
116+
if ts := atomic.LoadUint64(&d.sampler.metrics.lastSampledTS); ts != 0 {
117+
lastSampledTS.Observe(ctx, int64(ts))
102118
}
119+
120+
totalSampledInt := atomic.LoadUint64(&d.sampler.metrics.totalSampledInt)
121+
totalSampled.Observe(ctx, int64(totalSampledInt))
103122
},
104123
)
105124

@@ -110,29 +129,54 @@ func (d *DASer) InitMetrics() error {
110129
return nil
111130
}
112131

113-
func (m *metrics) observeSample(ctx context.Context, h *header.ExtendedHeader, sampleTime time.Duration, err error) {
132+
// observeSample records the time it took to sample a header +
133+
// the amount of sampled contiguous headers
134+
func (m *metrics) observeSample(
135+
ctx context.Context,
136+
h *header.ExtendedHeader,
137+
sampleTime time.Duration,
138+
err error,
139+
) {
114140
if m == nil {
115141
return
116142
}
117143
m.sampleTime.Record(ctx, sampleTime.Seconds(),
118144
attribute.Bool("failed", err != nil),
119-
attribute.Int("header_width", len(h.DAH.RowsRoots)))
145+
attribute.Int("header_width", len(h.DAH.RowsRoots)),
146+
)
147+
120148
m.sampled.Add(ctx, 1,
121149
attribute.Bool("failed", err != nil),
122-
attribute.Int("header_width", len(h.DAH.RowsRoots)))
123-
atomic.StoreInt64(&m.lastSampledTS, time.Now().UTC().Unix())
150+
attribute.Int("header_width", len(h.DAH.RowsRoots)),
151+
)
152+
153+
atomic.StoreUint64(&m.lastSampledTS, uint64(time.Now().UTC().Unix()))
154+
155+
if err == nil {
156+
atomic.AddUint64(&m.totalSampledInt, 1)
157+
}
124158
}
125159

160+
// observeGetHeader records the time it took to get a header from the header store.
126161
func (m *metrics) observeGetHeader(ctx context.Context, d time.Duration) {
127162
if m == nil {
128163
return
129164
}
130165
m.getHeaderTime.Record(ctx, d.Seconds())
131166
}
132167

168+
// observeNewHead records the network head.
133169
func (m *metrics) observeNewHead(ctx context.Context) {
134170
if m == nil {
135171
return
136172
}
137173
m.newHead.Add(ctx, 1)
138174
}
175+
176+
// recordTotalSampled records the total sampled headers.
177+
func (m *metrics) recordTotalSampled(totalSampled uint64) {
178+
if m == nil {
179+
return
180+
}
181+
atomic.StoreUint64(&m.totalSampledInt, totalSampled)
182+
}

das/options.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ type Parameters struct {
3838
// SampleFrom is the height sampling will start from if no previous checkpoint was saved
3939
SampleFrom uint64
4040

41-
// SampleTimeout is a maximum amount time sampling of single block may take until it will be canceled
41+
// SampleTimeout is a maximum amount time sampling of single block may take until it will be
42+
// canceled
4243
SampleTimeout time.Duration
4344
}
4445

das/worker.go

+32-8
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,15 @@ func (w *worker) run(
6060
}
6161

6262
metrics.observeGetHeader(ctx, time.Since(startGet))
63-
log.Debugw("got header from header store", "height", h.Height(), "hash", h.Hash(),
64-
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startGet))
63+
64+
log.Debugw(
65+
"got header from header store",
66+
"height", h.Height(),
67+
"hash", h.Hash(),
68+
"square width", len(h.DAH.RowsRoots),
69+
"data root", h.DAH.Hash(),
70+
"finished (s)", time.Since(startGet),
71+
)
6572

6673
startSample := time.Now()
6774
err = sample(ctx, h)
@@ -72,18 +79,35 @@ func (w *worker) run(
7279
w.setResult(curr, err)
7380
metrics.observeSample(ctx, h, time.Since(startSample), err)
7481
if err != nil {
75-
log.Debugw("failed to sampled header", "height", h.Height(), "hash", h.Hash(),
76-
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err)
82+
log.Debugw(
83+
"failed to sampled header",
84+
"height", h.Height(),
85+
"hash", h.Hash(),
86+
"square width", len(h.DAH.RowsRoots),
87+
"data root", h.DAH.Hash(),
88+
"err", err,
89+
)
7790
} else {
78-
log.Debugw("sampled header", "height", h.Height(), "hash", h.Hash(),
79-
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startSample))
91+
log.Debugw(
92+
"sampled header",
93+
"height", h.Height(),
94+
"hash", h.Hash(),
95+
"square width", len(h.DAH.RowsRoots),
96+
"data root", h.DAH.Hash(),
97+
"finished (s)", time.Since(startSample),
98+
)
8099
}
81100
}
82101

83102
if w.state.Curr > w.state.From {
84103
jobTime := time.Since(jobStart)
85-
log.Infow("sampled headers", "from", w.state.From, "to", w.state.Curr,
86-
"finished (s)", jobTime.Seconds())
104+
log.Infow(
105+
"sampled headers",
106+
"from", w.state.From,
107+
"to", w.state.Curr,
108+
"finished (s)",
109+
jobTime.Seconds(),
110+
)
87111
}
88112

89113
select {

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ require (
6262
go.opentelemetry.io/otel/sdk v1.11.2
6363
go.opentelemetry.io/otel/sdk/metric v0.34.0
6464
go.opentelemetry.io/otel/trace v1.11.2
65+
go.opentelemetry.io/proto/otlp v0.19.0
6566
go.uber.org/fx v1.18.2
6667
go.uber.org/multierr v1.9.0
6768
golang.org/x/crypto v0.5.0
6869
golang.org/x/sync v0.1.0
6970
golang.org/x/text v0.6.0
7071
google.golang.org/grpc v1.52.0
72+
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8
7173
)
7274

7375
require (
@@ -296,7 +298,6 @@ require (
296298
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect
297299
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect
298300
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect
299-
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
300301
go.uber.org/atomic v1.10.0 // indirect
301302
go.uber.org/dig v1.15.0 // indirect
302303
go.uber.org/zap v1.24.0 // indirect
@@ -311,7 +312,6 @@ require (
311312
google.golang.org/api v0.102.0 // indirect
312313
google.golang.org/appengine v1.6.7 // indirect
313314
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
314-
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 // indirect
315315
gopkg.in/ini.v1 v1.67.0 // indirect
316316
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
317317
gopkg.in/yaml.v2 v2.4.0 // indirect

header/metrics.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import (
99
"go.opentelemetry.io/otel/metric/unit"
1010

1111
libhead "github.com/celestiaorg/celestia-node/libs/header"
12+
"github.com/celestiaorg/celestia-node/libs/header/sync"
1213
)
1314

1415
var meter = global.MeterProvider().Meter("header")
1516

16-
// WithMetrics enables Otel metrics to monitor head.
17-
func WithMetrics(store libhead.Store[*ExtendedHeader]) {
17+
// WithMetrics enables Otel metrics to monitor head and total amount of synced headers.
18+
func WithMetrics(store libhead.Store[*ExtendedHeader], syncer *sync.Syncer[*ExtendedHeader]) error {
1819
headC, _ := meter.AsyncInt64().Counter(
1920
"head",
2021
instrument.WithUnit(unit.Dimensionless),
@@ -40,6 +41,8 @@ func WithMetrics(store libhead.Store[*ExtendedHeader]) {
4041
},
4142
)
4243
if err != nil {
43-
panic(err)
44+
return err
4445
}
46+
47+
return syncer.InitMetrics()
4548
}

libs/header/sync/metrics.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
7+
"go.opentelemetry.io/otel/metric/global"
8+
"go.opentelemetry.io/otel/metric/instrument"
9+
)
10+
11+
var meter = global.MeterProvider().Meter("header/sync")
12+
13+
type metrics struct {
14+
totalSynced int64
15+
}
16+
17+
func (s *Syncer[H]) InitMetrics() error {
18+
s.metrics = &metrics{}
19+
20+
totalSynced, err := meter.
21+
AsyncFloat64().
22+
Gauge(
23+
"total_synced_headers",
24+
instrument.WithDescription("total synced headers"),
25+
)
26+
if err != nil {
27+
return err
28+
}
29+
30+
return meter.RegisterCallback(
31+
[]instrument.Asynchronous{
32+
totalSynced,
33+
},
34+
func(ctx context.Context) {
35+
totalSynced.Observe(ctx, float64(atomic.LoadInt64(&s.metrics.totalSynced)))
36+
},
37+
)
38+
}
39+
40+
// recordTotalSynced records the total amount of synced headers.
41+
func (m *metrics) recordTotalSynced(totalSynced int) {
42+
atomic.AddInt64(&m.totalSynced, int64(totalSynced))
43+
}

libs/header/sync/sync.go

+5
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type Syncer[H header.Header] struct {
5353
cancel context.CancelFunc
5454

5555
Params *Parameters
56+
57+
metrics *metrics
5658
}
5759

5860
// NewSyncer creates a new instance of Syncer.
@@ -233,6 +235,9 @@ func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error)
233235
if err != nil && processed == 0 {
234236
break
235237
}
238+
if s.metrics != nil {
239+
s.metrics.recordTotalSynced(processed)
240+
}
236241
}
237242

238243
s.stateLk.Lock()

nodebuilder/node/metrics.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package node
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.opentelemetry.io/otel/metric/global"
8+
"go.opentelemetry.io/otel/metric/instrument"
9+
)
10+
11+
var meter = global.MeterProvider().Meter("node")
12+
13+
var (
14+
timeStarted time.Time
15+
nodeStarted bool
16+
)
17+
18+
// WithMetrics registers node metrics.
19+
func WithMetrics() error {
20+
nodeStartTS, err := meter.
21+
AsyncFloat64().
22+
Gauge(
23+
"node_start_ts",
24+
instrument.WithDescription("timestamp when the node was started"),
25+
)
26+
if err != nil {
27+
return err
28+
}
29+
30+
totalNodeRunTime, err := meter.
31+
AsyncFloat64().
32+
Counter(
33+
"node_runtime_counter_in_seconds",
34+
instrument.WithDescription("total time the node has been running"),
35+
)
36+
if err != nil {
37+
return err
38+
}
39+
40+
return meter.RegisterCallback(
41+
[]instrument.Asynchronous{nodeStartTS, totalNodeRunTime},
42+
func(ctx context.Context) {
43+
if !nodeStarted {
44+
// Observe node start timestamp
45+
timeStarted = time.Now()
46+
nodeStartTS.Observe(ctx, float64(timeStarted.Unix()))
47+
nodeStarted = true
48+
}
49+
50+
totalNodeRunTime.Observe(ctx, time.Since(timeStarted).Seconds())
51+
},
52+
)
53+
}

0 commit comments

Comments
 (0)