Skip to content

Commit 38eb6e4

Browse files
authored
feat: add metrics to status protocol (#5059)
1 parent 2cc5d53 commit 38eb6e4

File tree

18 files changed

+502
-105
lines changed

18 files changed

+502
-105
lines changed

go.mod

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,32 +32,30 @@ require (
3232
github.com/multiformats/go-multihash v0.2.3
3333
github.com/multiformats/go-multistream v0.5.0
3434
github.com/opentracing/opentracing-go v1.2.0
35-
github.com/prometheus/client_golang v1.18.0
35+
github.com/prometheus/client_golang v1.21.1
3636
github.com/spf13/afero v1.6.0
3737
github.com/spf13/cobra v1.5.0
3838
github.com/spf13/viper v1.7.0
39-
github.com/stretchr/testify v1.8.4
39+
github.com/stretchr/testify v1.10.0
4040
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
4141
github.com/uber/jaeger-client-go v2.24.0+incompatible
4242
github.com/vmihailenco/msgpack/v5 v5.3.4
4343
github.com/wealdtech/go-ens/v3 v3.5.1
4444
gitlab.com/nolash/go-mockbytes v0.0.7
4545
go.uber.org/atomic v1.11.0
4646
go.uber.org/goleak v1.3.0
47-
golang.org/x/crypto v0.23.0
48-
golang.org/x/net v0.25.0
49-
golang.org/x/sync v0.7.0
50-
golang.org/x/sys v0.20.0
51-
golang.org/x/term v0.20.0
47+
golang.org/x/crypto v0.31.0
48+
golang.org/x/net v0.33.0
49+
golang.org/x/sync v0.10.0
50+
golang.org/x/sys v0.28.0
51+
golang.org/x/term v0.27.0
5252
golang.org/x/time v0.5.0
5353
gopkg.in/yaml.v2 v2.4.0
5454
resenje.org/multex v0.1.0
5555
resenje.org/singleflight v0.4.0
5656
resenje.org/web v0.4.3
5757
)
5858

59-
require golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
60-
6159
require (
6260
github.com/BurntSushi/toml v1.1.0 // indirect
6361
github.com/Microsoft/go-winio v0.6.1 // indirect
@@ -66,7 +64,7 @@ require (
6664
github.com/beorn7/perks v1.0.1 // indirect
6765
github.com/bits-and-blooms/bitset v1.10.0 // indirect
6866
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
69-
github.com/cespare/xxhash/v2 v2.2.0 // indirect
67+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
7068
github.com/codahale/hdrhistogram v0.0.0-00010101000000-000000000000 // indirect
7169
github.com/consensys/bavard v0.1.13 // indirect
7270
github.com/consensys/gnark-crypto v0.12.1 // indirect
@@ -104,7 +102,7 @@ require (
104102
github.com/ipfs/go-log/v2 v2.5.1 // indirect
105103
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
106104
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
107-
github.com/klauspost/compress v1.17.6 // indirect
105+
github.com/klauspost/compress v1.17.11 // indirect
108106
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
109107
github.com/koron/go-ssdp v0.0.4 // indirect
110108
github.com/leodido/go-urn v1.2.1 // indirect
@@ -132,16 +130,17 @@ require (
132130
github.com/multiformats/go-multibase v0.2.0 // indirect
133131
github.com/multiformats/go-multicodec v0.9.0 // indirect
134132
github.com/multiformats/go-varint v0.0.7 // indirect
133+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
135134
github.com/nxadm/tail v1.4.8 // indirect
136135
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
137136
github.com/opencontainers/runtime-spec v1.2.0 // indirect
138137
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
139138
github.com/pelletier/go-toml v1.8.0 // indirect
140139
github.com/pkg/errors v0.9.1 // indirect
141140
github.com/pmezard/go-difflib v1.0.0 // indirect
142-
github.com/prometheus/client_model v0.6.0 // indirect
143-
github.com/prometheus/common v0.47.0 // indirect
144-
github.com/prometheus/procfs v0.12.0 // indirect
141+
github.com/prometheus/client_model v0.6.1 // indirect
142+
github.com/prometheus/common v0.62.0
143+
github.com/prometheus/procfs v0.15.1 // indirect
145144
github.com/prometheus/statsd_exporter v0.22.7 // indirect
146145
github.com/quic-go/qpack v0.4.0 // indirect
147146
github.com/quic-go/quic-go v0.42.0 // indirect
@@ -166,10 +165,11 @@ require (
166165
go.uber.org/mock v0.4.0 // indirect
167166
go.uber.org/multierr v1.11.0 // indirect
168167
go.uber.org/zap v1.27.0 // indirect
168+
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
169169
golang.org/x/mod v0.17.0 // indirect
170-
golang.org/x/text v0.15.0 // indirect
171-
golang.org/x/tools v0.20.0 // indirect
172-
google.golang.org/protobuf v1.33.0 // indirect
170+
golang.org/x/text v0.21.0 // indirect
171+
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
172+
google.golang.org/protobuf v1.36.1 // indirect
173173
gopkg.in/ini.v1 v1.57.0 // indirect
174174
gopkg.in/yaml.v3 v3.0.1 // indirect
175175
lukechampine.com/blake3 v1.2.1 // indirect

go.sum

Lines changed: 32 additions & 30 deletions
Large diffs are not rendered by default.

pkg/api/api.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,49 @@ func (s *Service) contentLengthMetricMiddleware() func(h http.Handler) http.Hand
506506
}
507507
}
508508

509+
func (s *Service) downloadSpeedMetricMiddleware(endpoint string) func(h http.Handler) http.Handler {
510+
return func(h http.Handler) http.Handler {
511+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
512+
start := time.Now()
513+
h.ServeHTTP(w, r)
514+
515+
rw, ok := w.(*responseWriter)
516+
if !ok {
517+
return
518+
}
519+
if rw.Status() != http.StatusOK {
520+
return
521+
}
522+
523+
speed := float64(rw.size) / time.Since(start).Seconds()
524+
s.metrics.DownloadSpeed.WithLabelValues(endpoint).Observe(speed)
525+
})
526+
}
527+
}
528+
529+
// observeUploadSpeed measures the speed of the upload and sets appropriate
530+
// labels to the metrics. This function can be called as a deferred function in
531+
// side of handler. This functions is not in a form of a middleware to more
532+
// directly pass the deferred flag.
533+
func (s *Service) observeUploadSpeed(w http.ResponseWriter, r *http.Request, start time.Time, endpoint string, deferred bool) {
534+
rw, ok := w.(*responseWriter)
535+
if !ok {
536+
return
537+
}
538+
539+
if rw.Status() != http.StatusOK && rw.Status() != http.StatusCreated {
540+
return
541+
}
542+
543+
mode := "direct"
544+
if deferred {
545+
mode = "deferred"
546+
}
547+
548+
speed := float64(r.ContentLength) / time.Since(start).Seconds()
549+
s.metrics.UploadSpeed.WithLabelValues(endpoint, mode).Observe(speed)
550+
}
551+
509552
// gasConfigMiddleware can be used by the APIs that allow block chain transactions to set
510553
// gas price and gas limit through the HTTP API headers.
511554
func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) http.Handler {

pkg/api/bytes.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"net/http"
1212
"strconv"
13+
"time"
1314

1415
"github.com/ethersphere/bee/v2/pkg/accesscontrol"
1516
"github.com/ethersphere/bee/v2/pkg/cac"
@@ -71,6 +72,8 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
7172
span.SetTag("tagID", tag)
7273
}
7374

75+
defer s.observeUploadSpeed(w, r, time.Now(), "bytes", deferred)
76+
7477
putter, err := s.newStamperPutter(ctx, putterOptions{
7578
BatchID: headers.BatchID,
7679
TagID: tag,

pkg/api/bzz.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
8787
deferred = defaultUploadMethod(headers.Deferred)
8888
)
8989

90+
defer s.observeUploadSpeed(w, r, time.Now(), "bzz", deferred)
91+
9092
if deferred || headers.Pin {
9193
tag, err = s.getOrCreateSessionID(headers.SwarmTag)
9294
if err != nil {

pkg/api/metrics.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type metrics struct {
2929
ResponseCodeCounts *prometheus.CounterVec
3030

3131
ContentApiDuration prometheus.HistogramVec
32+
UploadSpeed *prometheus.HistogramVec
33+
DownloadSpeed *prometheus.HistogramVec
3234
}
3335

3436
func newMetrics() metrics {
@@ -64,6 +66,20 @@ func newMetrics() metrics {
6466
Help: "Histogram of file upload API response durations.",
6567
Buckets: []float64{0.5, 1, 2.5, 5, 10, 30, 60},
6668
}, []string{"filesize", "method"}),
69+
UploadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
70+
Namespace: m.Namespace,
71+
Subsystem: subsystem,
72+
Name: "upload_speed",
73+
Help: "Histogram of upload speed in B/s.",
74+
Buckets: []float64{0.25, 0.5, 0.75, 1, 1.25, 1.5, 1.75, 2, 2.5, 3, 4, 5},
75+
}, []string{"endpoint", "mode"}),
76+
DownloadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
77+
Namespace: m.Namespace,
78+
Subsystem: subsystem,
79+
Name: "download_speed",
80+
Help: "Histogram of download speed in B/s.",
81+
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 4, 5, 6, 7, 8, 9},
82+
}, []string{"endpoint"}),
6783
}
6884
}
6985

@@ -82,6 +98,14 @@ func (s *Service) Metrics() []prometheus.Collector {
8298
return m.PrometheusCollectorsFromFields(s.metrics)
8399
}
84100

101+
// StatusMetrics exposes metrics that are exposed on the status protocol.
102+
func (s *Service) StatusMetrics() []prometheus.Collector {
103+
return []prometheus.Collector{
104+
s.metrics.UploadSpeed,
105+
s.metrics.DownloadSpeed,
106+
}
107+
}
108+
85109
func (s *Service) pageviewMetricsHandler(h http.Handler) http.Handler {
86110
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
87111
start := time.Now()
@@ -114,18 +138,25 @@ type responseWriter struct {
114138
UpgradedResponseWriter
115139
statusCode int
116140
wroteHeader bool
141+
size int
117142
}
118143

119144
func newResponseWriter(w http.ResponseWriter) *responseWriter {
120145
// StatusOK is called by default if nothing else is called
121146
uw := w.(UpgradedResponseWriter)
122-
return &responseWriter{uw, http.StatusOK, false}
147+
return &responseWriter{uw, http.StatusOK, false, 0}
123148
}
124149

125150
func (rw *responseWriter) Status() int {
126151
return rw.statusCode
127152
}
128153

154+
func (rr *responseWriter) Write(b []byte) (int, error) {
155+
size, err := rr.UpgradedResponseWriter.Write(b)
156+
rr.size += size
157+
return size, err
158+
}
159+
129160
func (rw *responseWriter) WriteHeader(code int) {
130161
if rw.wroteHeader {
131162
return

pkg/api/router.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ func (s *Service) mountAPI() {
243243
handle("/bytes/{address}", jsonhttp.MethodHandler{
244244
"GET": web.ChainHandlers(
245245
s.contentLengthMetricMiddleware(),
246+
s.downloadSpeedMetricMiddleware("bytes"),
246247
s.newTracingHandler("bytes-download"),
247248
s.actDecryptionHandler(),
248249
web.FinalHandlerFunc(s.bytesGetHandler),
@@ -325,6 +326,7 @@ func (s *Service) mountAPI() {
325326
s.contentLengthMetricMiddleware(),
326327
s.newTracingHandler("bzz-download"),
327328
s.actDecryptionHandler(),
329+
s.downloadSpeedMetricMiddleware("bzz"),
328330
web.FinalHandlerFunc(s.bzzDownloadHandler),
329331
),
330332
"HEAD": web.ChainHandlers(

pkg/api/status_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func TestGetStatus(t *testing.T) {
6060
mode.String(),
6161
ssMock,
6262
ssMock,
63+
nil,
6364
)
6465

6566
statusSvc.SetSync(ssMock)
@@ -86,6 +87,7 @@ func TestGetStatus(t *testing.T) {
8687
"",
8788
nil,
8889
nil,
90+
nil,
8991
),
9092
})
9193

pkg/node/node.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import (
7777
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
7878
"github.com/hashicorp/go-multierror"
7979
ma "github.com/multiformats/go-multiaddr"
80+
"github.com/prometheus/client_golang/prometheus"
8081
promc "github.com/prometheus/client_golang/prometheus"
8182
"golang.org/x/crypto/sha3"
8283
"golang.org/x/sync/errgroup"
@@ -910,7 +911,16 @@ func NewBee(
910911

911912
validStamp := postage.ValidStamp(batchStore)
912913

913-
nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore)
914+
// metrics exposed on the status protocol
915+
statusMetricsRegistry := prometheus.NewRegistry()
916+
if localStore != nil {
917+
statusMetricsRegistry.MustRegister(localStore.StatusMetrics()...)
918+
}
919+
if p2ps != nil {
920+
statusMetricsRegistry.MustRegister(p2ps.StatusMetrics()...)
921+
}
922+
923+
nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore, statusMetricsRegistry)
914924
if err = p2ps.AddProtocol(nodeStatus.Protocol()); err != nil {
915925
return nil, fmt.Errorf("status service: %w", err)
916926
}
@@ -968,6 +978,8 @@ func NewBee(
968978
retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching)
969979
localStore.SetRetrievalService(retrieval)
970980

981+
statusMetricsRegistry.MustRegister(retrieval.StatusMetrics()...)
982+
971983
pusherService := pusher.New(networkID, localStore, pushSyncProtocol, batchStore, logger, warmupTime, pusher.DefaultRetryCount)
972984
b.pusherCloser = pusherService
973985

@@ -1196,6 +1208,9 @@ func NewBee(
11961208
apiService.EnableFullAPI()
11971209

11981210
apiService.SetRedistributionAgent(agent)
1211+
1212+
// api metrics are constructed on api.Service.Configure
1213+
statusMetricsRegistry.MustRegister(apiService.StatusMetrics()...)
11991214
}
12001215

12011216
if err := kad.Start(ctx); err != nil {

pkg/p2p/libp2p/libp2p.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
563563
}
564564

565565
s.host.SetStreamHandlerMatch(id, matcher, func(streamlibp2p network.Stream) {
566-
start := time.Now()
567566
peerID := streamlibp2p.Conn().RemotePeer()
568567
overlay, found := s.peers.overlay(peerID)
569568
if !found {
@@ -581,14 +580,15 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
581580
stream := newStream(streamlibp2p, s.metrics)
582581

583582
// exchange headers
583+
headersStartTime := time.Now()
584584
ctx, cancel := context.WithTimeout(s.ctx, s.HeadersRWTimeout)
585585
defer cancel()
586586
if err := handleHeaders(ctx, ss.Headler, stream, overlay); err != nil {
587587
s.logger.Debug("handle protocol: handle headers failed", "protocol", p.Name, "version", p.Version, "stream", ss.Name, "peer", overlay, "error", err)
588588
_ = stream.Reset()
589589
return
590590
}
591-
s.metrics.HeadersExchangeDuration.Observe(time.Since(start).Seconds())
591+
s.metrics.HeadersExchangeDuration.Observe(time.Since(headersStartTime).Seconds())
592592

593593
ctx, cancel = context.WithCancel(s.ctx)
594594

0 commit comments

Comments
 (0)