Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 79 additions & 88 deletions fibre/store.go
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@

"github.com/celestiaorg/celestia-app/v9/x/fibre/types"
pebbledb "github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
gogoproto "github.com/cosmos/gogoproto/proto"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
badger "github.com/ipfs/go-ds-badger4"
pebble "github.com/ipfs/go-ds-pebble"
)

// ErrStoreNotFound is returned when no shard is found for a [Commitment] in the [Store].
Expand Down Expand Up @@ -44,48 +40,28 @@
// It provides indexed access by [Commitment], promise hash, and timestamp.
type Store struct {
cfg StoreConfig
ds ds.Batching
db *pebbledb.DB
}

// NewMemoryStore creates a new [Store] with an in-memory datastore.
// NewMemoryStore creates a new [Store] with an in-memory Pebble database.
func NewMemoryStore(cfg StoreConfig) *Store {
db, err := pebbledb.Open("", &pebbledb.Options{FS: vfs.NewMem()})
if err != nil {
panic(fmt.Sprintf("opening in-memory pebble: %v", err))
}
return &Store{
cfg: cfg,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
db: db,
}
}

// NewBadgerStore creates a new [Store] with a badger4 datastore at the given path.
// Tuned for FIBRE's use case: large values (32KB rows), bulk writes/reads.
// NewBadgerStore creates a new [Store] backed by Pebble.
// Retained for test compatibility; delegates to [NewPebbleStore].
func NewBadgerStore(cfg StoreConfig) (*Store, error) {
opts := badger.DefaultOptions

// Value log settings - optimized for large values (32KB rows)
opts.ValueThreshold = 1024 // Values > 1KB go to value log (default 1MB is too high)

// Compaction settings - reduce write stalls during bulk writes
opts.NumMemtables = 5 // More memtables before stall (default 5)
opts.NumLevelZeroTables = 5 // L0 tables before compaction starts (default 5)
opts.NumLevelZeroTablesStall = 15 // L0 tables before write stall (default 15)
opts.NumCompactors = 4 // Parallel compaction goroutines (default 4)

// GC settings - for time-based pruning workload
opts.GcDiscardRatio = 0.2
opts.GcSleep = time.Second
opts.GcInterval = time.Minute

bds, err := badger.NewDatastore(cfg.Path, &opts)
if err != nil {
return nil, fmt.Errorf("creating badger datastore: %w", err)
}

return &Store{
cfg: cfg,
ds: bds,
}, nil
return NewPebbleStore(cfg)
Comment thread
walldiss marked this conversation as resolved.
Outdated
}

// NewPebbleStore creates a new [Store] with a pebble datastore at the given path.
// NewPebbleStore creates a new [Store] with a Pebble database at the given path.
// Tuned for FIBRE's use case: large values (32KB rows), bulk writes/reads.
func NewPebbleStore(cfg StoreConfig) (*Store, error) {
opts := &pebbledb.Options{}
Expand All @@ -110,14 +86,14 @@
}
}

pds, err := pebble.NewDatastore(cfg.Path, pebble.WithPebbleOpts(opts))
db, err := pebbledb.Open(cfg.Path, opts)
if err != nil {
return nil, fmt.Errorf("creating pebble datastore: %w", err)
return nil, fmt.Errorf("opening pebble database: %w", err)
}

return &Store{
cfg: cfg,
ds: pds,
db: db,
}, nil
}

Expand All @@ -129,11 +105,9 @@
// determining when the entry will be removed by [PruneBefore].
//
// Puts for the same commitments but different promises are allowed and are stored independently without deduplication.
func (s *Store) Put(ctx context.Context, promise *PaymentPromise, shard *types.BlobShard, pruneAt time.Time) error {
batch, err := s.ds.Batch(ctx)
if err != nil {
return fmt.Errorf("creating batch: %w", err)
}
func (s *Store) Put(_ context.Context, promise *PaymentPromise, shard *types.BlobShard, pruneAt time.Time) error {
batch := s.db.NewBatch()
defer batch.Close()

// write payment promise
promiseProto, err := promise.ToProto()
Expand All @@ -148,7 +122,7 @@
if err != nil {
return fmt.Errorf("getting promise hash: %w", err)
}
if err := batch.Put(ctx, promiseKey(promiseHash), ppData); err != nil {
if err := batch.Set(promiseKey(promiseHash), ppData, pebbledb.NoSync); err != nil {
return fmt.Errorf("putting payment promise: %w", err)
}

Expand All @@ -157,16 +131,16 @@
if err != nil {
return fmt.Errorf("marshaling shard: %w", err)
}
if err := batch.Put(ctx, shardKey(promise.Commitment, promiseHash), shardData); err != nil {
if err := batch.Set(shardKey(promise.Commitment, promiseHash), shardData, pebbledb.NoSync); err != nil {
return fmt.Errorf("putting shard: %w", err)
}

// write prune index
if err := batch.Put(ctx, pruneKey(pruneAt, promise.Commitment, promiseHash), []byte{}); err != nil {
if err := batch.Set(pruneKey(pruneAt, promise.Commitment, promiseHash), []byte{}, pebbledb.NoSync); err != nil {
return fmt.Errorf("putting prune index: %w", err)
}

return batch.Commit(ctx)
return batch.Commit(pebbledb.NoSync)
}

// Get retrieves [types.BlobShard] for the given [Commitment].
Expand All @@ -177,24 +151,27 @@
//
// If unmarshaling fails for some entries, it continues trying others.
// Returns an error only if all entries fail to unmarshal or if no shards are found.
func (s *Store) Get(ctx context.Context, commitment Commitment) (*types.BlobShard, error) {
results, err := s.ds.Query(ctx, query.Query{
Prefix: fmt.Sprintf("/shard/%s", commitment.String()),
func (s *Store) Get(_ context.Context, commitment Commitment) (*types.BlobShard, error) {
prefix := []byte(fmt.Sprintf("/shard/%s/", commitment.String()))

Check failure on line 155 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)

Check failure on line 155 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint-fibre

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)
iter, err := s.db.NewIter(&pebbledb.IterOptions{
LowerBound: prefix,
UpperBound: prefixUpperBound(prefix),
})
if err != nil {
return nil, fmt.Errorf("querying shards: %w", err)
return nil, fmt.Errorf("creating iterator: %w", err)
}
defer results.Close()
defer iter.Close()

var rerr error
for result := range results.Next() {
if result.Error != nil {
rerr = errors.Join(rerr, result.Error)
for valid := iter.First(); valid; valid = iter.Next() {
val, err := iter.ValueAndErr()
if err != nil {
rerr = errors.Join(rerr, err)
continue
}

shard := &types.BlobShard{}
if err := gogoproto.Unmarshal(result.Value, shard); err != nil {
if err := gogoproto.Unmarshal(val, shard); err != nil {
rerr = errors.Join(rerr, fmt.Errorf("unmarshaling shard: %w", err))
continue
}
Expand All @@ -210,11 +187,12 @@
}

// GetPaymentPromise retrieves a [PaymentPromise] by its hash.
func (s *Store) GetPaymentPromise(ctx context.Context, promiseHash []byte) (*PaymentPromise, error) {
data, err := s.ds.Get(ctx, promiseKey(promiseHash))
func (s *Store) GetPaymentPromise(_ context.Context, promiseHash []byte) (*PaymentPromise, error) {
data, closer, err := s.db.Get(promiseKey(promiseHash))
if err != nil {
return nil, fmt.Errorf("getting payment promise: %w", err)
}
defer closer.Close()

var ppProto types.PaymentPromise
if err := gogoproto.Unmarshal(data, &ppProto); err != nil {
Expand All @@ -235,64 +213,61 @@
// It works by iterating over the ordered prune index and deleting each entry until the given time,
// so it iterates exactly over the entries that need to be pruned. The order is guaranteed by the
// underlying database and enforced with query.OrderByKey{}.
func (s *Store) PruneBefore(ctx context.Context, before time.Time) (int, error) {
results, err := s.ds.Query(ctx, query.Query{
Prefix: "/prune/",
KeysOnly: true,
Orders: []query.Order{query.OrderByKey{}},
func (s *Store) PruneBefore(_ context.Context, before time.Time) (int, error) {
Comment on lines -238 to +213
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here but do we want to permanently remove the context?

prefix := []byte("/prune/")
iter, err := s.db.NewIter(&pebbledb.IterOptions{
LowerBound: prefix,
UpperBound: prefixUpperBound(prefix),
})
if err != nil {
return 0, fmt.Errorf("querying prune index: %w", err)
return 0, fmt.Errorf("creating iterator: %w", err)
}
defer results.Close()
defer iter.Close()

batch, err := s.ds.Batch(ctx)
if err != nil {
return 0, fmt.Errorf("creating batch: %w", err)
}
batch := s.db.NewBatch()
defer batch.Close()

pruned := 0
beforeStr := formatTimestamp(before.UTC())
for result := range results.Next() {
if result.Error != nil {
return pruned, fmt.Errorf("iterating results: %w", result.Error)
}
for valid := iter.First(); valid; valid = iter.Next() {
key := iter.Key()

// extract timestamp from key: /prune/YYYYMMDDHHmm/...
// early termination: keys are sorted, so if timestamp >= cutoff, we're done
timestampStr := result.Key[7:19] // skip "/prune/" and take 12 chars
keyStr := string(key)
timestampStr := keyStr[7:19] // skip "/prune/" and take 12 chars
if timestampStr >= beforeStr {
break
}

// parse key: /prune/<timestamp>/<commitment>/<promise-hash>
commitment, promiseHash, ok := parsePruneKey(result.Key)
commitment, promiseHash, ok := parsePruneKey(keyStr)
if !ok {
continue
}

// delete all related entries
if err := batch.Delete(ctx, ds.NewKey(result.Key)); err != nil {
if err := batch.Delete(key, pebbledb.NoSync); err != nil {
return pruned, fmt.Errorf("deleting prune index: %w", err)
}
if err := batch.Delete(ctx, shardKey(commitment, promiseHash)); err != nil {
if err := batch.Delete(shardKey(commitment, promiseHash), pebbledb.NoSync); err != nil {
return pruned, fmt.Errorf("deleting shard: %w", err)
}
if err := batch.Delete(ctx, promiseKey(promiseHash)); err != nil {
if err := batch.Delete(promiseKey(promiseHash), pebbledb.NoSync); err != nil {
return pruned, fmt.Errorf("deleting payment promise: %w", err)
}
pruned++
}

if err := batch.Commit(ctx); err != nil {
if err := batch.Commit(pebbledb.NoSync); err != nil {
return pruned, fmt.Errorf("committing batch: %w", err)
}
return pruned, nil
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
}

// Close closes the underlying datastore.
// Close closes the underlying Pebble database.
func (s *Store) Close() error {
return s.ds.Close()
return s.db.Close()
}

// formatTimestamp formats a timestamp with minute precision (YYYYMMDDHHmm).
Expand All @@ -301,16 +276,32 @@
return timestamp.Format("200601021504")
}

func promiseKey(promiseHash []byte) ds.Key {
return ds.NewKey(fmt.Sprintf("/pp/%s", hex.EncodeToString(promiseHash)))
func promiseKey(promiseHash []byte) []byte {
return []byte(fmt.Sprintf("/pp/%s", hex.EncodeToString(promiseHash)))

Check failure on line 280 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)

Check failure on line 280 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint-fibre

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)
}

func shardKey(commitment Commitment, promiseHash []byte) ds.Key {
return ds.NewKey(fmt.Sprintf("/shard/%s/%s", commitment.String(), hex.EncodeToString(promiseHash)))
func shardKey(commitment Commitment, promiseHash []byte) []byte {
return []byte(fmt.Sprintf("/shard/%s/%s", commitment.String(), hex.EncodeToString(promiseHash)))

Check failure on line 284 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)

Check failure on line 284 in fibre/store.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint-fibre

fmtappendf: Replace []byte(fmt.Sprintf...) with fmt.Appendf (modernize)
}

func pruneKey(pruneAt time.Time, commitment Commitment, promiseHash []byte) ds.Key {
return ds.NewKey(fmt.Sprintf("/prune/%s/%s/%s", formatTimestamp(pruneAt.UTC()), commitment.String(), hex.EncodeToString(promiseHash)))
func pruneKey(pruneAt time.Time, commitment Commitment, promiseHash []byte) []byte {
return []byte(fmt.Sprintf("/prune/%s/%s/%s", formatTimestamp(pruneAt.UTC()), commitment.String(), hex.EncodeToString(promiseHash)))
}

// prefixUpperBound returns the upper bound for a prefix scan.
// It increments the last byte of the prefix to create an exclusive upper bound.
// For example, "/shard/abc" returns "/shard/abd".
func prefixUpperBound(prefix []byte) []byte {
upper := make([]byte, len(prefix))
copy(upper, prefix)
for i := len(upper) - 1; i >= 0; i-- {
upper[i]++
if upper[i] != 0 {
return upper
}
}
// all 0xff bytes - return nil to indicate no upper bound
return nil
}

// parsePruneKey extracts commitment and promise hash from a prune index key.
Expand Down
Loading