Skip to content

Commit e917062

Browse files
committed
Add new option to allow BlocksReExecutor to commit state to disk
Add new boolean option to BlocksReExecutor called CommitStateToDisk that will allow BlocksReExecutor.Blocks range to not only re-executes blocks but it also commit their state to triedb. Signed-off-by: Igor Braga <[email protected]>
1 parent f8bbec4 commit e917062

File tree

3 files changed

+173
-30
lines changed

3 files changed

+173
-30
lines changed

blocks_reexecutor/blocks_reexecutor.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Config struct {
3434
Enable bool `koanf:"enable"`
3535
Mode string `koanf:"mode"`
3636
Blocks string `koanf:"blocks"` // Range of blocks to be executed in json format
37+
CommitStateToDisk bool `koanf:"commit-state-to-disk"`
3738
Room int `koanf:"room"`
3839
MinBlocksPerThread uint64 `koanf:"min-blocks-per-thread"`
3940
TrieCleanLimit int `koanf:"trie-clean-limit"`
@@ -71,6 +72,7 @@ var DefaultConfig = Config{
7172
Mode: "random",
7273
Room: util.GoMaxProcs(),
7374
Blocks: `[[0,0]]`, // execute from chain start to chain end
75+
CommitStateToDisk: false,
7476
MinBlocksPerThread: 0,
7577
TrieCleanLimit: 0,
7678
ValidateMultiGas: false,
@@ -81,6 +83,7 @@ var TestConfig = Config{
8183
Enable: true,
8284
Mode: "full",
8385
Blocks: `[[0,0]]`, // execute from chain start to chain end
86+
CommitStateToDisk: false,
8487
Room: util.GoMaxProcs(),
8588
TrieCleanLimit: 600,
8689
MinBlocksPerThread: 0,
@@ -93,6 +96,7 @@ func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
9396
f.Bool(prefix+".enable", DefaultConfig.Enable, "enables re-execution of a range of blocks against historic state")
9497
f.String(prefix+".mode", DefaultConfig.Mode, "mode to run the blocks-reexecutor on. Valid modes full and random. full - execute all the blocks in the given range. random - execute a random sample range of blocks with in a given range")
9598
f.String(prefix+".blocks", DefaultConfig.Blocks, "json encoded list of block ranges in the form of start and end block numbers in a list of size 2")
99+
f.Bool(prefix+".commit-state-to-disk", DefaultConfig.CommitStateToDisk, "if set, blocks-reexecutor not only re-executes blocks but it also commits their state to triedb")
96100
f.Int(prefix+".room", DefaultConfig.Room, "number of threads to parallelize blocks re-execution")
97101
f.Uint64(prefix+".min-blocks-per-thread", DefaultConfig.MinBlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
98102
f.Int(prefix+".trie-clean-limit", DefaultConfig.TrieCleanLimit, "memory allowance (MB) to use for caching trie nodes in memory")
@@ -110,9 +114,10 @@ type BlocksReExecutor struct {
110114
fatalErrChan chan error
111115
blocks [][3]uint64 // start, end and minBlocksPerThread of block ranges
112116
mutex sync.Mutex
117+
success chan struct{}
113118
}
114119

115-
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrChan chan error) (*BlocksReExecutor, error) {
120+
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database) (*BlocksReExecutor, error) {
116121
if blockchain.TrieDB().Scheme() == rawdb.PathScheme {
117122
return nil, errors.New("blocksReExecutor not supported on pathdb")
118123
}
@@ -197,7 +202,8 @@ func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrC
197202
stateFor: stateForFunc,
198203
blocks: blocks,
199204
done: make(chan struct{}, c.Room),
200-
fatalErrChan: fatalErrChan,
205+
fatalErrChan: make(chan error, 10),
206+
success: make(chan struct{}),
201207
mutex: sync.Mutex{},
202208
}
203209
return blocksReExecutor, nil
@@ -260,7 +266,7 @@ func (s *BlocksReExecutor) Impl(ctx context.Context, startBlock, currentBlock, m
260266
return currentBlock
261267
}
262268

263-
func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
269+
func (s *BlocksReExecutor) Start(ctx context.Context) {
264270
s.StopWaiter.Start(ctx, s)
265271
s.LaunchThread(func(ctx context.Context) {
266272
// Using returned value from Impl we can avoid duplicate reexecution of blocks
@@ -273,12 +279,23 @@ func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
273279
log.Info("BlocksReExecutor successfully completed re-execution of blocks against historic state", "stateAt", blocks[0], "startBlock", blocks[0]+1, "endBlock", blocks[1])
274280
}
275281
}
276-
if done != nil {
277-
close(done)
282+
if s.success != nil {
283+
close(s.success)
278284
}
279285
})
280286
}
281287

288+
func (s *BlocksReExecutor) WaitForReExecution(ctx context.Context) error {
289+
select {
290+
case err := <-s.fatalErrChan:
291+
log.Error("shutting BlocksReExecutor down due to fatal error", "err", err)
292+
return fmt.Errorf("shutting BlocksReExecutor down due to fatal error %v", err)
293+
case <-s.success:
294+
}
295+
296+
return nil
297+
}
298+
282299
func (s *BlocksReExecutor) StopAndWait() {
283300
s.StopWaiter.StopAndWait()
284301
}
@@ -299,6 +316,14 @@ func (s *BlocksReExecutor) commitStateAndVerify(statedb *state.StateDB, expected
299316
if result != expected {
300317
return nil, arbitrum.NoopStateRelease, fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
301318
}
319+
320+
if s.config.CommitStateToDisk {
321+
err = s.db.TrieDB().Commit(expected, false)
322+
if err != nil {
323+
return nil, arbitrum.NoopStateRelease, fmt.Errorf("trieDB commit failed in commitStateAndVerify, number %d root %v: %w", blockNumber, expected, err)
324+
}
325+
}
326+
302327
sdb, err := state.New(result, s.db)
303328
if err == nil {
304329
_ = s.db.TrieDB().Reference(result, common.Hash{})

cmd/nitro/nitro.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -466,27 +466,23 @@ func mainImpl() int {
466466
return 1
467467
}
468468

469-
fatalErrChan := make(chan error, 10)
470-
471469
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
472470
if !nodeConfig.Init.ThenQuit {
473471
log.Error("blocks-reexecutor cannot be enabled without --init.then-quit")
474472
return 1
475473
}
476-
blocksReExecutor, err := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb, fatalErrChan)
474+
blocksReExecutor, err := blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb)
477475
if err != nil {
478476
log.Error("error initializing blocksReExecutor", "err", err)
479477
return 1
480478
}
481-
success := make(chan struct{})
482-
blocksReExecutor.Start(ctx, success)
479+
480+
blocksReExecutor.Start(ctx)
483481
deferFuncs = append(deferFuncs, func() { blocksReExecutor.StopAndWait() })
484-
select {
485-
case err := <-fatalErrChan:
486-
log.Error("shutting down due to fatal error", "err", err)
482+
err = blocksReExecutor.WaitForReExecution(ctx)
483+
if err != nil {
487484
defer log.Error("shut down due to fatal error", "err", err)
488485
return 1
489-
case <-success:
490486
}
491487
}
492488

@@ -510,6 +506,8 @@ func mainImpl() int {
510506
return 1
511507
}
512508

509+
fatalErrChan := make(chan error, 10)
510+
513511
var valNode *valnode.ValidationNode
514512
if sameProcessValidationNodeEnabled {
515513
valNode, err = valnode.CreateValidationNode(

system_tests/blocks_reexecutor_test.go

Lines changed: 136 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package arbtest
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"testing"
68

79
"github.com/ethereum/go-ethereum/common"
810
"github.com/ethereum/go-ethereum/core/rawdb"
11+
"github.com/ethereum/go-ethereum/rpc"
12+
"github.com/ethereum/go-ethereum/trie"
913

1014
blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor"
1115
)
@@ -36,7 +40,6 @@ func testBlocksReExecutorModes(t *testing.T, onMultipleRanges bool) {
3640
l2info := builder.L2Info
3741
client := builder.L2.Client
3842
blockchain := builder.L2.ExecNode.Backend.ArbInterface().BlockChain()
39-
feedErrChan := make(chan error, 10)
4043

4144
l2info.GenerateAccount("User2")
4245
genesis, err := client.BlockNumber(ctx)
@@ -62,27 +65,144 @@ func testBlocksReExecutorModes(t *testing.T, onMultipleRanges bool) {
6265
// Reexecute blocks at mode full
6366
c.MinBlocksPerThread = 10
6467
Require(t, c.Validate())
65-
executorFull, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
68+
executorFull, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB)
69+
Require(t, err)
70+
executorFull.Start(ctx)
71+
err = executorFull.WaitForReExecution(ctx)
6672
Require(t, err)
67-
success := make(chan struct{})
68-
executorFull.Start(ctx, success)
69-
select {
70-
case err := <-feedErrChan:
71-
t.Fatalf("error occurred: %v", err)
72-
case <-success:
73-
}
7473

7574
// Reexecute blocks at mode random
7675
c.Mode = "random"
7776
c.MinBlocksPerThread = 20
7877
Require(t, c.Validate())
79-
executorRandom, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
78+
executorRandom, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB)
79+
Require(t, err)
80+
executorRandom.Start(ctx)
81+
err = executorFull.WaitForReExecution(ctx)
82+
Require(t, err)
83+
}
84+
85+
func TestBlocksReExecutorCommitState(t *testing.T) {
86+
ctx, cancel := context.WithCancel(context.Background())
87+
defer cancel()
88+
89+
builder := NewNodeBuilder(ctx).DefaultConfig(t, false).WithDatabase(rawdb.DBPebble)
90+
// For now PathDB is not supported
91+
builder.RequireScheme(t, rawdb.HashScheme)
92+
93+
// 1. Setup builder to be run in sparse archive mode
94+
builder.execConfig.Caching.Archive = true
95+
builder.execConfig.Caching.SnapshotCache = 0 // disable snapshots
96+
builder.execConfig.Caching.BlockAge = 0
97+
builder.execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 100
98+
builder.execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0
99+
100+
maxRecreateStateDepth := int64(100 * 1000 * 1000)
101+
102+
builder.execConfig.RPC.MaxRecreateStateDepth = maxRecreateStateDepth
103+
builder.execConfig.Sequencer.MaxBlockSpeed = 0
104+
builder.execConfig.Sequencer.MaxTxDataSize = 150
105+
106+
cleanup := builder.Build(t)
107+
defer cleanup()
108+
109+
l2info := builder.L2Info
110+
client := builder.L2.Client
111+
blockchain := builder.L2.ExecNode.Backend.ArbInterface().BlockChain()
112+
113+
// 2. Create about 500 blocks
114+
l2info.GenerateAccount("User2")
115+
genesis, err := client.BlockNumber(ctx)
116+
Require(t, err)
117+
for i := genesis; i < genesis+500; i++ {
118+
tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, common.Big1, nil)
119+
err := client.SendTransaction(ctx, tx)
120+
Require(t, err)
121+
receipt, err := EnsureTxSucceeded(ctx, client, tx)
122+
Require(t, err)
123+
if have, want := receipt.BlockNumber.Uint64(), uint64(i)+1; have != want {
124+
Fatal(t, "internal test error - tx got included in unexpected block number, have:", have, "want:", want)
125+
}
126+
}
127+
128+
// 3. Set 2 blocks as target block tests
129+
bc := builder.L2.ExecNode.Backend.ArbInterface().BlockChain()
130+
blockNum35 := 35
131+
block35 := bc.GetBlockByNumber(uint64(blockNum35))
132+
blockRoot35 := block35.Root()
133+
134+
blockNum150 := 150
135+
block150 := bc.GetBlockByNumber(uint64(blockNum150))
136+
blockRoot150 := block150.Root()
137+
138+
// 4. Since we started L1 as a sparse archive we should only expect states to be
139+
// present for blocks 100, 200, 300, etc.
140+
_, err = bc.StateAt(blockRoot35)
141+
expectedErr := &trie.MissingNodeError{}
142+
if !errors.As(err, &expectedErr) {
143+
Fatal(t, "getting state failed with unexpected error, root:", block35.Root(), "blockNumber:", blockNum35, "blockHash:", block35.Hash(), "err:", err)
144+
}
145+
146+
_, err = bc.StateAt(blockRoot150)
147+
expectedErr = &trie.MissingNodeError{}
148+
if !errors.As(err, &expectedErr) {
149+
Fatal(t, "getting state failed with unexpected error, root:", block150.Root(), "blockNumber:", blockNum150, "blockHash:", block150.Hash(), "err:", err)
150+
}
151+
152+
blockTraceConfig := map[string]interface{}{"tracer": "callTracer"}
153+
blockTraceConfig["tracerConfig"] = map[string]interface{}{"onlyTopCall": false}
154+
155+
l2rpc := builder.L2.Stack.Attach()
156+
var blockTrace json.RawMessage
157+
err = l2rpc.CallContext(ctx, &blockTrace, "debug_traceBlockByNumber", rpc.BlockNumber(blockNum150), blockTraceConfig)
158+
Require(t, err)
159+
160+
// 5. We first run BlocksReExecutor with ValidateMultiGas set to false to make sure
161+
// BlocksReExecutor does not commit state to disk
162+
c := blocksreexecutor.TestConfig
163+
c.ValidateMultiGas = true
164+
c.Blocks = `[[0, 42], [90, 160], [180, 200]]`
165+
// We don't need to explicit set it to false since default is false, but we want to be explicit
166+
c.CommitStateToDisk = false
167+
168+
// Reexecute blocks at mode full
169+
c.MinBlocksPerThread = 10
170+
Require(t, c.Validate())
171+
executorFull, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB)
172+
Require(t, err)
173+
executorFull.Start(ctx)
174+
err = executorFull.WaitForReExecution(ctx)
80175
Require(t, err)
81-
success = make(chan struct{})
82-
executorRandom.Start(ctx, success)
83-
select {
84-
case err := <-feedErrChan:
85-
t.Fatalf("error occurred: %v", err)
86-
case <-success:
176+
177+
// 6. Now that we have run Block Re-executor CommitStateToDisk set to false
178+
// we should expect state to NOT be present for both of the above blocks
179+
_, err = bc.StateAt(blockRoot35)
180+
if !errors.As(err, &expectedErr) {
181+
Fatal(t, "getting state failed with unexpected error, root:", block35.Root(), "blockNumber:", blockNum35, "blockHash:", block35.Hash(), "err:", err)
182+
}
183+
184+
_, err = bc.StateAt(blockRoot150)
185+
expectedErr = &trie.MissingNodeError{}
186+
if !errors.As(err, &expectedErr) {
187+
Fatal(t, "getting state failed with unexpected error, root:", block150.Root(), "blockNumber:", blockNum150, "blockHash:", block150.Hash(), "err:", err)
87188
}
189+
190+
// 7. Now we run BlocksReExecutor with ValidateMultiGas set to true to make sure
191+
// BlocksReExecutor does indeed commit state of c.Blocks to disk.
192+
// We don't set c.Blocks since we want to use the same blocks range
193+
c.CommitStateToDisk = true
194+
Require(t, c.Validate())
195+
executorFullCommit, err := blocksreexecutor.New(&c, blockchain, builder.L2.ExecNode.ChainDB)
196+
Require(t, err)
197+
executorFullCommit.Start(ctx)
198+
err = executorFullCommit.WaitForReExecution(ctx)
199+
Require(t, err)
200+
201+
// 8. Now that we have run Block Re-executor with CommitStateToDisk set to true
202+
// we should expect state to be present for both of the above blocks
203+
_, err = bc.StateAt(blockRoot35)
204+
Require(t, err)
205+
206+
_, err = bc.StateAt(blockRoot150)
207+
Require(t, err)
88208
}

0 commit comments

Comments
 (0)