Skip to content

WIP: Peerdas slotter #31966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ const (
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
txAvgSize = 4 * 1024

// txBlobOverhead is an approximation of the overhead that an additional blob
// has on transaction size. This is added to the slotter to avoid
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
txBlobOverhead = 2048

// txMaxSize is the maximum size a single transaction can have, outside
// the included blobs. Since blob transactions are pulled instead of pushed,
// and only a small metadata is kept in ram, the rest is on disk, there is
Expand Down Expand Up @@ -358,6 +363,35 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
return tx.Type() == types.BlobTxType
}

// migrateBilly migrates the blob data from one billy instance to another one
// with a different slotter.
func migrateBilly(path string, maxBlobs int) error {
oldSlotter := newSlotter(maxBlobs)
newSlotter := newSlotterEIP7594(maxBlobs)
// Open the new billy instance
newStore, err := billy.Open(billy.Options{Path: path, Repair: true}, newSlotter, nil)
if err != nil {
return err
}
// Iterate over the old billy instance and copy the data to the new instance
var indexingErr error
index := func(key uint64, size uint32, data []byte) {
_, indexingErr = newStore.Put(data)
}
oldStore, err := billy.Open(billy.Options{Path: path, Repair: true}, oldSlotter, index)
if err != nil {
return err
}
if indexingErr != nil {
return indexingErr
}
// TODO (MariusVanDerWijden)
// remove the old shelves here
oldStore.Close()
newStore.Close()
return nil
}

// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
Expand Down Expand Up @@ -390,14 +424,26 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
}
p.head, p.state = head, state

// Check if we need to migrate the blob database to a new format
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
if p.chain.Config().IsOsaka(head.Number, head.Time) {
// Check if we need to migrate our blob db to the new slotter
oldSlotSize := 135168
if os.Stat(filepath.Join(queuedir, fmt.Sprintf("bkt_%08d.bag", oldSlotSize))); err == nil {
if err := migrateBilly(queuedir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config())); err != nil {
return err
}
}
slotter = newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
}

// Index all transactions on disk and delete anything unprocessable
var fails []uint64
index := func(id uint64, size uint32, blob []byte) {
if p.parseTransaction(id, size, blob) != nil {
fails = append(fails, id)
}
}
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
if err != nil {
return err
Expand Down
112 changes: 110 additions & 2 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ func TestOpenCap(t *testing.T) {

keep = []common.Address{addr1, addr3}
drop = []common.Address{addr2}
size = uint64(2 * (txAvgSize + blobSize))
size = uint64(2 * (txAvgSize + blobSize + txBlobOverhead))
)
store.Put(blob1)
store.Put(blob2)
Expand All @@ -994,7 +994,7 @@ func TestOpenCap(t *testing.T) {

// Verify pool capping twice: first by reducing the data cap, then restarting
// with a high cap to ensure everything was persisted previously
for _, datacap := range []uint64{2 * (txAvgSize + blobSize), 100 * (txAvgSize + blobSize)} {
for _, datacap := range []uint64{2 * (txAvgSize + blobSize + txBlobOverhead), 100 * (txAvgSize + blobSize + txBlobOverhead)} {
// Create a blob pool out of the pre-seeded data, but cap it to 2 blob transaction
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
Expand Down Expand Up @@ -1142,6 +1142,114 @@ func TestChangingSlotterSize(t *testing.T) {
}
}

// TestBillyMigration tests the billy migration from the default slotter to
// the PeerDAS slotter. This tests both the migration of the slotter
// as well as increasing the slotter size of the new slotter.
func TestBillyMigration(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))

// Create a temporary folder for the persistent backend
storage := t.TempDir()

os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
// Create the billy with the old slotter
oldSlotter := newSlotter(6)
store, _ := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, oldSlotter, nil)

// Create transactions from a few accounts.
var (
key1, _ = crypto.GenerateKey()
key2, _ = crypto.GenerateKey()
key3, _ = crypto.GenerateKey()

addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
addr3 = crypto.PubkeyToAddress(key3.PublicKey)

tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, key1)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, key2)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, key3)

blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
)

// Write the two safely sized txs to store. note: although the store is
// configured for a blob count of 6, it can also support around ~1mb of call
// data - all this to say that we aren't using the the absolute largest shelf
// available.
store.Put(blob1)
store.Put(blob2)
store.Close()

// Mimic a blobpool with max blob count of 6 upgrading to a max blob count of 24.
for _, maxBlobs := range []int{6, 24} {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr3, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.Commit(0, true, false)

// Make custom chain config where the max blob count changes based on the loop variable.
zero := uint64(0)
config := &params.ChainConfig{
ChainID: big.NewInt(1),
LondonBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
CancunTime: &zero,
OsakaTime: &zero,
BlobScheduleConfig: &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{
Target: maxBlobs / 2,
Max: maxBlobs,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
Osaka: &params.BlobConfig{
Target: maxBlobs / 2,
Max: maxBlobs,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
},
}
chain := &testBlockChain{
config: config,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}

// Try to add the big blob tx. In the initial iteration it should overflow
// the pool. On the subsequent iteration it should be accepted.
errs := pool.Add([]*types.Transaction{tx3}, true)
if _, ok := pool.index[addr3]; ok && maxBlobs == 6 {
t.Errorf("expected insert of oversized blob tx to fail: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
} else if !ok && maxBlobs == 10 {
t.Errorf("expected insert of oversized blob tx to succeed: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
}

// Verify the regular two txs are always available.
if got := pool.Get(tx1.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx1.Hash(), addr1)
}
if got := pool.Get(tx2.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx2.Hash(), addr2)
}

// Verify all the calculated pool internals. Interestingly, this is **not**
// a duplication of the above checks, this actually validates the verifier
// using the above already hard coded checks.
//
// Do not remove this, nor alter the above to be generic.
verifyPoolInternals(t, pool)

pool.Close()
}
}

// TestBlobCountLimit tests the blobpool enforced limits on the max blob count.
func TestBlobCountLimit(t *testing.T) {
var (
Expand Down
18 changes: 18 additions & 0 deletions core/txpool/blobpool/slotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,21 @@ func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) {
return slotsize, finished
}
}

// newSlotterEIP7594 creates a different slotter for EIP-7594 transactions.
// EIP-7594 (PeerDAS) changes the median transaction size which means the current
// static 4KB average size is not enough anymore.
// This slotter adds a dynamic overhead component to the slotter, which also
// captures the notion that blob transactions with more blobs are also more likely to
// to have more calldata.
func newSlotterEIP7594(maxBlobsPerTransaction int) func() (uint32, bool) {
slotsize := uint32(txAvgSize)
slotsize -= uint32(blobSize) + txBlobOverhead // underflows, it's ok, will overflow back in the first return

return func() (size uint32, done bool) {
slotsize += blobSize + txBlobOverhead
finished := slotsize > uint32(maxBlobsPerTransaction)*blobSize+txMaxSize

return slotsize, finished
}
}
30 changes: 15 additions & 15 deletions core/txpool/blobpool/slotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ func TestNewSlotter(t *testing.T) {
}
// Compare the database shelves to the expected ones
want := []uint32{
0*blobSize + txAvgSize, // 0 blob + some expected tx infos
1*blobSize + txAvgSize, // 1 blob + some expected tx infos
2*blobSize + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data)
3*blobSize + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data)
4*blobSize + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data)
5*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
6*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
7*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
8*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
9*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
10*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
11*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
12*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
13*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
14*blobSize + txAvgSize, // 1-6 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size
0*blobSize + 0*txBlobOverhead + txAvgSize, // 0 blob + some expected tx infos
1*blobSize + 1*txBlobOverhead + txAvgSize, // 1 blob + some expected tx infos
2*blobSize + 2*txBlobOverhead + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data)
3*blobSize + 3*txBlobOverhead + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data)
4*blobSize + 4*txBlobOverhead + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data)
5*blobSize + 5*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
6*blobSize + 6*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
7*blobSize + 7*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
8*blobSize + 8*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
9*blobSize + 9*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
10*blobSize + 10*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
11*blobSize + 11*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
12*blobSize + 12*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
13*blobSize + 13*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
14*blobSize + 14*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size
}
if len(shelves) != len(want) {
t.Errorf("shelves count mismatch: have %d, want %d", len(shelves), len(want))
Expand Down
Loading