Skip to content

Commit aa8fff5

Browse files
authored
Merge pull request #464 from celestiaorg/hlib/shares-batching
2 parents bf8c028 + e5ecb0a commit aa8fff5

File tree

3 files changed

+66
-11
lines changed

3 files changed

+66
-11
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/ipfs/go-ds-badger2 v0.1.1
2222
github.com/ipfs/go-ipfs-blockstore v0.1.6
2323
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
24+
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
2425
github.com/ipfs/go-ipfs-routing v0.1.0
2526
github.com/ipfs/go-ipld-format v0.2.0
2627
github.com/ipfs/go-log/v2 v2.5.0

ipld/read_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,14 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/ipfs/go-blockservice"
1314
"github.com/ipfs/go-cid"
15+
ds "github.com/ipfs/go-datastore"
16+
dssync "github.com/ipfs/go-datastore/sync"
17+
blockstore "github.com/ipfs/go-ipfs-blockstore"
18+
offline "github.com/ipfs/go-ipfs-exchange-offline"
1419
format "github.com/ipfs/go-ipld-format"
20+
"github.com/ipfs/go-merkledag"
1521
mdutils "github.com/ipfs/go-merkledag/test"
1622
"github.com/stretchr/testify/assert"
1723
"github.com/stretchr/testify/require"
@@ -365,6 +371,43 @@ func TestGetLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T
365371

366372
}
367373

374+
func TestBatchSize(t *testing.T) {
375+
tests := []struct {
376+
name string
377+
origWidth int
378+
}{
379+
{"2", 2},
380+
{"4", 4},
381+
{"8", 8},
382+
{"16", 16},
383+
{"32", 32},
384+
// {"64", 64}, // test case too large for CI with race detector
385+
}
386+
for _, tt := range tests {
387+
t.Run(tt.name, func(t *testing.T) {
388+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(tt.origWidth))
389+
defer cancel()
390+
391+
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
392+
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
393+
394+
eds := generateRandEDS(t, tt.origWidth)
395+
_, err := PutData(ctx, ExtractODSShares(eds), dag)
396+
require.NoError(t, err)
397+
398+
out, err := bs.AllKeysChan(ctx)
399+
require.NoError(t, err)
400+
401+
var count int
402+
for range out {
403+
count++
404+
}
405+
extendedWidth := tt.origWidth * 2
406+
assert.Equalf(t, count, batchSize(extendedWidth), "batchSize(%v)", extendedWidth)
407+
})
408+
}
409+
}
410+
368411
func putErasuredDataToDag(t *testing.T, rawData [][]byte) (format.DAGService, da.DataAvailabilityHeader) {
369412
// calc square size
370413
squareSize := uint64(math.Sqrt(float64(len(rawData))))

ipld/write.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,17 @@ import (
1313
"github.com/tendermint/tendermint/pkg/wrapper"
1414
)
1515

16-
// BatchSize defines an amount of IPLD Nodes to be buffered and written at once.
17-
// This configuration is very database backend specific and the current default(128) is not optimized for the version of
18-
// Badger we use. We set it to one to avoid test flakiness, as some tests may read for data that was not written yet.
19-
// TODO(@Wondertan): Find out the perfect value for Badger(e.g. ask PL folks) or migrate to go-car IPLD
20-
// storage(preferred).
21-
const BatchSize = 1
22-
2316
// PutData posts erasured block data to IPFS using the provided ipld.NodeAdder.
2417
func PutData(ctx context.Context, shares [][]byte, adder ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) {
2518
if len(shares) == 0 {
2619
return nil, fmt.Errorf("empty data") // empty block is not an empty Data
2720
}
28-
// create nmt adder wrapping batch adder
29-
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder, ipld.MaxSizeBatchOption(BatchSize)))
21+
squareSize := int(math.Sqrt(float64(len(shares))))
22+
// create nmt adder wrapping batch adder with calculated size
23+
bs := batchSize(squareSize * 2)
24+
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder, ipld.MaxSizeBatchOption(bs)))
3025
// create the nmt wrapper to generate row and col commitments
31-
squareSize := uint64(math.Sqrt(float64(len(shares))))
32-
tree := wrapper.NewErasuredNamespacedMerkleTree(squareSize, nmt.NodeVisitor(batchAdder.Visit))
26+
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit))
3327
// recompute the eds
3428
eds, err := rsmt2d.ComputeExtendedDataSquare(shares, rsmt2d.NewRSGF8Codec(), tree.Constructor)
3529
if err != nil {
@@ -55,3 +49,20 @@ func ExtractODSShares(eds *rsmt2d.ExtendedDataSquare) [][]byte {
5549
}
5650
return origShares
5751
}
52+
53+
// batchSize calculates the amount of nodes that are generated from block of 'squareSizes'
54+
// to be batched in one write.
55+
func batchSize(squareSize int) int {
56+
// (squareSize*2-1) - amount of nodes in a generated binary tree
57+
// squareSize*2 - the total number of trees, both for rows and cols
58+
// (squareSize*squareSize) - all the shares
59+
//
60+
// Note that while our IPLD tree looks like this:
61+
// ---X
62+
// -X---X
63+
// X-X-X-X
64+
// X-X-X-X
65+
// here we count leaves only once: the CIDs are the same for columns and rows
66+
// and for the last two layers as well:
67+
return (squareSize*2-1)*squareSize*2 - (squareSize * squareSize)
68+
}

0 commit comments

Comments
 (0)