Skip to content

Commit d60dda8

Browse files
authored
feat(shwap/bitswap): add blockstore metrics (#3862)
1 parent 68a5e66 commit d60dda8

File tree

5 files changed

+238
-9
lines changed

5 files changed

+238
-9
lines changed

Diff for: nodebuilder/settings.go

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
9595
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
9696
fx.Invoke(node.WithMetrics),
9797
fx.Invoke(share.WithDiscoveryMetrics),
98+
fx.Invoke(share.WithBlockStoreMetrics),
9899
)
99100

100101
samplingMetrics := fx.Options(

Diff for: nodebuilder/share/bitswap.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,23 @@ func dataExchange(tp node.Type, params bitSwapParams) exchange.SessionExchange {
6060
}
6161
}
6262

63-
func blockstoreFromDatastore(ds datastore.Batching) (blockstore.Blockstore, error) {
64-
return blockstore.NewBlockstore(ds), nil
63+
func blockstoreFromDatastore(ds datastore.Batching) (*bitswap.BlockstoreWithMetrics, error) {
64+
bs := blockstore.NewBlockstore(ds)
65+
return bitswap.NewBlockstoreWithMetrics(bs)
6566
}
6667

67-
func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (blockstore.Blockstore, error) {
68+
func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (*bitswap.BlockstoreWithMetrics, error) {
6869
if blockStoreCacheSize == 0 {
6970
// no cache, return plain blockstore
70-
return &bitswap.Blockstore{Getter: store}, nil
71+
bs := &bitswap.Blockstore{Getter: store}
72+
return bitswap.NewBlockstoreWithMetrics(bs)
7173
}
7274
withCache, err := store.WithCache("blockstore", blockStoreCacheSize)
7375
if err != nil {
7476
return nil, fmt.Errorf("create cached store for blockstore:%w", err)
7577
}
7678
bs := &bitswap.Blockstore{Getter: withCache}
77-
return bs, nil
79+
return bitswap.NewBlockstoreWithMetrics(bs)
7880
}
7981

8082
type bitSwapParams struct {

Diff for: nodebuilder/share/module.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/celestiaorg/celestia-node/share/availability/full"
1717
"github.com/celestiaorg/celestia-node/share/availability/light"
1818
"github.com/celestiaorg/celestia-node/share/shwap"
19+
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
1920
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
2021
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
2122
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
@@ -69,14 +70,26 @@ func bitswapComponents(tp node.Type, cfg *Config) fx.Option {
6970
case node.Light:
7071
return fx.Options(
7172
opts,
72-
fx.Provide(blockstoreFromDatastore),
73+
fx.Provide(
74+
fx.Annotate(
75+
blockstoreFromDatastore,
76+
fx.As(fx.Self()),
77+
fx.As(new(blockstore.Blockstore)),
78+
),
79+
),
7380
)
7481
case node.Full, node.Bridge:
7582
return fx.Options(
7683
opts,
77-
fx.Provide(func(store *store.Store) (blockstore.Blockstore, error) {
78-
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
79-
}),
84+
fx.Provide(
85+
fx.Annotate(
86+
func(store *store.Store) (*bitswap.BlockstoreWithMetrics, error) {
87+
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
88+
},
89+
fx.As(fx.Self()),
90+
fx.As(new(blockstore.Blockstore)),
91+
),
92+
),
8093
)
8194
default:
8295
panic("invalid node type")

Diff for: nodebuilder/share/opts.go

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package share
33
import (
44
"errors"
55

6+
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
67
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
78
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
89
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
@@ -56,3 +57,7 @@ func WithShrexGetterMetrics(sg *shrex_getter.Getter) error {
5657
func WithStoreMetrics(s *store.Store) error {
5758
return s.WithMetrics()
5859
}
60+
61+
func WithBlockStoreMetrics(bs *bitswap.BlockstoreWithMetrics) error {
62+
return bs.WithMetrics()
63+
}

Diff for: share/shwap/p2p/bitswap/blockstore_metrics.go

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package bitswap
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
bstore "github.com/ipfs/boxo/blockstore"
9+
blocks "github.com/ipfs/go-block-format"
10+
"github.com/ipfs/go-cid"
11+
ipld "github.com/ipfs/go-ipld-format"
12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/metric"
15+
)
16+
17+
const (
18+
notFoundKey = "not_found"
19+
failedKey = "failed"
20+
)
21+
22+
var (
23+
meter = otel.Meter("bitswap_blockstore")
24+
_ bstore.Blockstore = (*BlockstoreWithMetrics)(nil)
25+
)
26+
27+
// BlockstoreWithMetrics is a blockstore that collects metrics on blockstore operations.
28+
type BlockstoreWithMetrics struct {
29+
bs bstore.Blockstore
30+
metrics *metrics
31+
}
32+
33+
type metrics struct {
34+
delete metric.Int64Counter
35+
has metric.Int64Counter
36+
get metric.Int64Counter
37+
getSize metric.Int64Counter
38+
put metric.Int64Counter
39+
putMany metric.Int64Counter
40+
allKeysChan metric.Int64Counter
41+
}
42+
43+
// NewBlockstoreWithMetrics creates a new BlockstoreWithMetrics.
44+
func NewBlockstoreWithMetrics(bs bstore.Blockstore) (*BlockstoreWithMetrics, error) {
45+
return &BlockstoreWithMetrics{
46+
bs: bs,
47+
}, nil
48+
}
49+
50+
// WithMetrics enables metrics collection on the blockstore.
51+
func (w *BlockstoreWithMetrics) WithMetrics() error {
52+
del, err := meter.Int64Counter(
53+
"blockstore_delete_block",
54+
metric.WithDescription("Blockstore delete block operation"),
55+
)
56+
if err != nil {
57+
return fmt.Errorf("failed to create blockstore delete block counter: %w", err)
58+
}
59+
60+
has, err := meter.Int64Counter(
61+
"blockstore_has",
62+
metric.WithDescription("Blockstore has operation"),
63+
)
64+
if err != nil {
65+
return fmt.Errorf("failed to create blockstore has counter: %w", err)
66+
}
67+
68+
get, err := meter.Int64Counter(
69+
"blockstore_get",
70+
metric.WithDescription("Blockstore get operation"),
71+
)
72+
if err != nil {
73+
return fmt.Errorf("failed to create blockstore get counter: %w", err)
74+
}
75+
76+
getSize, err := meter.Int64Counter(
77+
"blockstore_get_size",
78+
metric.WithDescription("Blockstore get size operation"),
79+
)
80+
if err != nil {
81+
return fmt.Errorf("failed to create blockstore get size counter: %w", err)
82+
}
83+
84+
put, err := meter.Int64Counter(
85+
"blockstore_put",
86+
metric.WithDescription("Blockstore put operation"),
87+
)
88+
if err != nil {
89+
return fmt.Errorf("failed to create blockstore put counter: %w", err)
90+
}
91+
92+
putMany, err := meter.Int64Counter(
93+
"blockstore_put_many",
94+
metric.WithDescription("Blockstore put many operation"),
95+
)
96+
if err != nil {
97+
return fmt.Errorf("failed to create blockstore put many counter: %w", err)
98+
}
99+
100+
allKeysChan, err := meter.Int64Counter(
101+
"blockstore_all_keys_chan",
102+
metric.WithDescription("Blockstore all keys chan operation"),
103+
)
104+
if err != nil {
105+
return fmt.Errorf("failed to create blockstore all keys chan counter: %w", err)
106+
}
107+
108+
w.metrics = &metrics{
109+
delete: del,
110+
has: has,
111+
get: get,
112+
getSize: getSize,
113+
put: put,
114+
putMany: putMany,
115+
allKeysChan: allKeysChan,
116+
}
117+
return nil
118+
}
119+
120+
func (w *BlockstoreWithMetrics) DeleteBlock(ctx context.Context, cid cid.Cid) error {
121+
if w.metrics == nil {
122+
return w.bs.DeleteBlock(ctx, cid)
123+
}
124+
err := w.bs.DeleteBlock(ctx, cid)
125+
w.metrics.delete.Add(ctx, 1, metric.WithAttributes(
126+
attribute.Bool(failedKey, err != nil),
127+
))
128+
return err
129+
}
130+
131+
func (w *BlockstoreWithMetrics) Has(ctx context.Context, cid cid.Cid) (bool, error) {
132+
if w.metrics == nil {
133+
return w.bs.Has(ctx, cid)
134+
}
135+
has, err := w.bs.Has(ctx, cid)
136+
notFound := errors.Is(err, ipld.ErrNotFound{})
137+
failed := err != nil && !notFound
138+
w.metrics.has.Add(ctx, 1, metric.WithAttributes(
139+
attribute.Bool(failedKey, failed),
140+
attribute.Bool(notFoundKey, notFound),
141+
))
142+
return has, err
143+
}
144+
145+
func (w *BlockstoreWithMetrics) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
146+
if w.metrics == nil {
147+
return w.bs.Get(ctx, cid)
148+
}
149+
get, err := w.bs.Get(ctx, cid)
150+
notFound := errors.Is(err, ipld.ErrNotFound{})
151+
failed := err != nil && !notFound
152+
w.metrics.get.Add(ctx, 1, metric.WithAttributes(
153+
attribute.Bool(failedKey, failed),
154+
attribute.Bool(notFoundKey, notFound),
155+
))
156+
return get, err
157+
}
158+
159+
func (w *BlockstoreWithMetrics) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
160+
if w.metrics == nil {
161+
return w.bs.GetSize(ctx, cid)
162+
}
163+
size, err := w.bs.GetSize(ctx, cid)
164+
notFound := errors.Is(err, ipld.ErrNotFound{})
165+
failed := err != nil && !notFound
166+
w.metrics.getSize.Add(ctx, 1, metric.WithAttributes(
167+
attribute.Bool(failedKey, failed),
168+
attribute.Bool(notFoundKey, notFound),
169+
))
170+
return size, err
171+
}
172+
173+
func (w *BlockstoreWithMetrics) Put(ctx context.Context, block blocks.Block) error {
174+
if w.metrics == nil {
175+
return w.bs.Put(ctx, block)
176+
}
177+
err := w.bs.Put(ctx, block)
178+
w.metrics.put.Add(ctx, 1, metric.WithAttributes(
179+
attribute.Bool(failedKey, err != nil),
180+
))
181+
return err
182+
}
183+
184+
func (w *BlockstoreWithMetrics) PutMany(ctx context.Context, blocks []blocks.Block) error {
185+
if w.metrics == nil {
186+
return w.bs.PutMany(ctx, blocks)
187+
}
188+
err := w.bs.PutMany(ctx, blocks)
189+
w.metrics.putMany.Add(ctx, 1, metric.WithAttributes(
190+
attribute.Bool(failedKey, err != nil),
191+
))
192+
return err
193+
}
194+
195+
func (w *BlockstoreWithMetrics) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
196+
if w.metrics == nil {
197+
return w.bs.AllKeysChan(ctx)
198+
}
199+
ch, err := w.bs.AllKeysChan(ctx)
200+
w.metrics.allKeysChan.Add(ctx, 1, metric.WithAttributes(
201+
attribute.Bool(failedKey, err != nil),
202+
))
203+
return ch, err
204+
}
205+
206+
func (w *BlockstoreWithMetrics) HashOnRead(enabled bool) {
207+
w.bs.HashOnRead(enabled)
208+
}

0 commit comments

Comments
 (0)