Skip to content

Commit 7935f1a

Browse files
committed
feat: support generate witness in txdag async precess;
1 parent dc8da3d commit 7935f1a

6 files changed

Lines changed: 302 additions & 46 deletions

File tree

core/blockchain.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,6 +2036,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
20362036
return it.index, err
20372037
}
20382038
}
2039+
2040+
//need to collect the witness after state root generation
2041+
if bc.enableTxDAG && statedb.Witness() != nil {
2042+
witnesses, err := statedb.MVStates().ResolveROTrieWitness()
2043+
if err != nil {
2044+
log.Warn("failed to resolve ROTrieWitness", "err", err)
2045+
return it.index, err
2046+
}
2047+
for _, witness := range witnesses {
2048+
statedb.Witness().AddState(witness)
2049+
}
2050+
}
20392051
if witness := statedb.Witness(); witness != nil {
20402052
if err = bc.validator.ValidateWitness(bc, witness, block.ReceiptHash(), block.Root()); err != nil {
20412053
bc.reportBlock(block, receipts, err)
Lines changed: 183 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
package types
1+
package state
22

33
import (
44
"fmt"
5+
"maps"
56
"strings"
67
"sync"
78

89
"github.com/ethereum/go-ethereum/common"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/ethereum/go-ethereum/crypto"
912
"github.com/ethereum/go-ethereum/log"
1013
"github.com/ethereum/go-ethereum/metrics"
1114
"golang.org/x/exp/slices"
@@ -229,12 +232,22 @@ func (s *RWSet) String() string {
229232
}
230233

231234
const (
235+
// tx rw events
232236
NewTxRWEvent byte = iota
233237
ReadAccRWEvent
234238
WriteAccRWEvent
235239
ReadSlotRWEvent
236240
WriteSlotRWEvent
241+
242+
// feature flags
237243
CannotGasFeeDelayRWEvent
244+
245+
// state witness events
246+
OriginAccReadEvent
247+
OriginSlotReadEvent
248+
249+
// execution events
250+
ExecutionDoneEvent
238251
)
239252

240253
type RWEventItem struct {
@@ -243,6 +256,10 @@ type RWEventItem struct {
243256
Addr common.Address
244257
State AccountState
245258
Slot common.Hash
259+
260+
// for witness generation
261+
StateRoot common.Hash
262+
StorageRoot common.Hash
246263
}
247264

248265
func (e RWEventItem) String() string {
@@ -259,6 +276,12 @@ func (e RWEventItem) String() string {
259276
return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.Slot)
260277
case CannotGasFeeDelayRWEvent:
261278
return fmt.Sprintf("(%v)", e.Event)
279+
case OriginAccReadEvent:
280+
return fmt.Sprintf("(%v)%v", e.Event, e.Addr)
281+
case OriginSlotReadEvent:
282+
return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.Slot)
283+
case ExecutionDoneEvent:
284+
return fmt.Sprintf("(%v)", e.Event)
262285
}
263286
return "Unknown"
264287
}
@@ -356,7 +379,7 @@ type MVStates struct {
356379
gasFeeReceivers []common.Address
357380
// dependency map cache for generating TxDAG
358381
// depMapCache[i].exist(j) means j->i, and i > j
359-
txDepCache []TxDep
382+
txDepCache []types.TxDep
360383
lock sync.RWMutex
361384

362385
// async rw event recorder
@@ -369,39 +392,59 @@ type MVStates struct {
369392
recordingWrite bool
370393
asyncRunning bool
371394
asyncWG sync.WaitGroup
395+
396+
// stateless related fields
397+
asyncWitnessRunning bool
398+
trieDB Database
399+
trieCache map[common.Hash]Trie
400+
witnessCache []map[string]struct{}
401+
witnessEventCh chan RWEventItem
372402
}
373403

374-
func NewMVStates(txCount int, gasFeeReceivers []common.Address) *MVStates {
404+
func NewMVStates(txCount int, gasFeeReceivers []common.Address, trieDB Database) *MVStates {
375405
s := &MVStates{
376406
accWriteSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
377407
slotWriteSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
378408
accReadSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
379409
slotReadSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
380410
rwEventCh: make(chan []RWEventItem, 100),
381411
gasFeeReceivers: gasFeeReceivers,
412+
413+
// witness related
414+
witnessEventCh: make(chan RWEventItem, 1000),
415+
trieDB: trieDB,
416+
trieCache: make(map[common.Hash]Trie),
382417
}
383418
return s
384419
}
385420

386421
func (s *MVStates) EnableAsyncGen() *MVStates {
422+
// start async rw event recorder & dep analysis
387423
s.asyncWG.Add(1)
388424
s.asyncRunning = true
389425
s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem)
390426
s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)]
391427
s.rwEventCacheIndex = 0
392428
s.asyncRWSet.index = -1
393429
go s.asyncRWEventLoop()
430+
431+
// start async witness generator
432+
s.asyncWG.Add(1)
433+
s.asyncWitnessRunning = true
434+
go s.asyncWitnessLoop()
394435
return s
395436
}
396437

397438
func (s *MVStates) Stop() {
398439
s.stopAsyncRecorder()
440+
s.stopAsyncWitness()
441+
s.asyncWG.Wait()
399442
}
400443

401444
func (s *MVStates) Copy() *MVStates {
402445
s.lock.Lock()
403446
defer s.lock.Unlock()
404-
ns := NewMVStates(len(s.rwSets), s.gasFeeReceivers)
447+
ns := NewMVStates(len(s.rwSets), s.gasFeeReceivers, s.trieDB)
405448
ns.nextFinaliseIndex = s.nextFinaliseIndex
406449
ns.txDepCache = append(ns.txDepCache, s.txDepCache...)
407450
ns.rwSets = append(ns.rwSets, s.rwSets...)
@@ -437,6 +480,15 @@ func (s *MVStates) Copy() *MVStates {
437480
ns.slotReadSet[addr][slot] = reads.Copy()
438481
}
439482
}
483+
484+
ns.trieCache = make(map[common.Hash]Trie)
485+
for key, trie := range s.trieCache {
486+
ns.trieCache[key] = ns.trieDB.CopyTrie(trie)
487+
}
488+
ns.witnessCache = make([]map[string]struct{}, len(s.witnessCache))
489+
for key, cache := range s.witnessCache {
490+
ns.witnessCache[key] = maps.Clone(cache)
491+
}
440492
return ns
441493
}
442494

@@ -454,6 +506,49 @@ func (s *MVStates) asyncRWEventLoop() {
454506
}
455507
}
456508

509+
func (s *MVStates) asyncWitnessLoop() {
510+
defer s.asyncWG.Done()
511+
for {
512+
select {
513+
case item, ok := <-s.witnessEventCh:
514+
if !ok {
515+
return
516+
}
517+
switch item.Event {
518+
case OriginAccReadEvent:
519+
key := common.Hash{}
520+
if s.trieCache[key] == nil {
521+
trie, err := s.trieDB.OpenTrie(item.StateRoot)
522+
if err != nil {
523+
log.Error("failed to account trie", "stateRoot", item.StateRoot, "error", err)
524+
continue
525+
}
526+
s.trieCache[key] = trie
527+
}
528+
// access account in trie
529+
s.trieCache[key].GetAccount(item.Addr)
530+
case OriginSlotReadEvent:
531+
key := crypto.Keccak256Hash(item.Addr.Bytes())
532+
if s.trieCache[key] == nil {
533+
trie, err := s.trieDB.OpenStorageTrie(item.StateRoot, item.Addr, item.StorageRoot, nil)
534+
if err != nil {
535+
log.Error("failed to account trie", "stateRoot", item.StateRoot, "error", err)
536+
continue
537+
}
538+
s.trieCache[key] = trie
539+
}
540+
// access storage slot in trie
541+
s.trieCache[key].GetStorage(item.Addr, item.Slot.Bytes())
542+
case ExecutionDoneEvent:
543+
// generate witness immediately
544+
for _, trie := range s.trieCache {
545+
s.witnessCache = append(s.witnessCache, trie.Witness())
546+
}
547+
}
548+
}
549+
}
550+
}
551+
457552
func (s *MVStates) handleRWEvents(items []RWEventItem) {
458553
readFrom, readTo := -1, -1
459554
writeFrom, writeTo := -1, -1
@@ -509,6 +604,13 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) {
509604
// recorde current as cannot gas fee delay
510605
case CannotGasFeeDelayRWEvent:
511606
s.asyncRWSet.cannotGasFeeDelay = true
607+
// handle witness generation events
608+
case ExecutionDoneEvent, OriginAccReadEvent, OriginSlotReadEvent:
609+
// if no trieDB, skip witness generation
610+
if s.trieDB == nil {
611+
continue
612+
}
613+
s.witnessEventCh <- item
512614
}
513615
}
514616
// handle last tx rw set
@@ -695,6 +797,55 @@ func (s *MVStates) RecordCannotDelayGasFee() {
695797
s.rwEventCacheIndex++
696798
}
697799

800+
func (s *MVStates) RecordOriginAccRead(addr common.Address) {
801+
if !s.asyncRunning || !s.recordingRead {
802+
return
803+
}
804+
if s.rwEventCacheIndex < len(s.rwEventCache) {
805+
s.rwEventCache[s.rwEventCacheIndex].Event = OriginAccReadEvent
806+
s.rwEventCache[s.rwEventCacheIndex].Addr = addr
807+
s.rwEventCacheIndex++
808+
return
809+
}
810+
s.rwEventCache = append(s.rwEventCache, RWEventItem{
811+
Event: OriginAccReadEvent,
812+
Addr: addr,
813+
})
814+
s.rwEventCacheIndex++
815+
}
816+
817+
func (s *MVStates) RecordOriginSlotRead(addr common.Address, slot common.Hash) {
818+
if !s.asyncRunning || !s.recordingRead {
819+
return
820+
}
821+
if s.rwEventCacheIndex < len(s.rwEventCache) {
822+
s.rwEventCache[s.rwEventCacheIndex].Event = OriginSlotReadEvent
823+
s.rwEventCache[s.rwEventCacheIndex].Addr = addr
824+
s.rwEventCache[s.rwEventCacheIndex].Slot = slot
825+
s.rwEventCacheIndex++
826+
return
827+
}
828+
s.rwEventCache = append(s.rwEventCache, RWEventItem{
829+
Event: OriginSlotReadEvent,
830+
Addr: addr,
831+
Slot: slot,
832+
})
833+
s.rwEventCacheIndex++
834+
}
835+
836+
// RecordExecutionDone record the execution done event
837+
func (s *MVStates) RecordExecutionDone() {
838+
if s.rwEventCacheIndex < len(s.rwEventCache) {
839+
s.rwEventCache[s.rwEventCacheIndex].Event = ExecutionDoneEvent
840+
s.rwEventCacheIndex++
841+
return
842+
}
843+
s.rwEventCache = append(s.rwEventCache, RWEventItem{
844+
Event: ExecutionDoneEvent,
845+
})
846+
s.rwEventCacheIndex++
847+
}
848+
698849
func (s *MVStates) BatchRecordHandle() {
699850
if !s.asyncRunning || s.rwEventCacheIndex == 0 {
700851
return
@@ -706,12 +857,22 @@ func (s *MVStates) BatchRecordHandle() {
706857
}
707858

708859
func (s *MVStates) stopAsyncRecorder() {
860+
s.lock.Lock()
861+
defer s.lock.Unlock()
709862
if s.asyncRunning {
710863
s.BatchRecordHandle()
711864
s.asyncRunning = false
712865
close(s.rwEventCh)
713866
rwEventCachePool.Put(&s.rwEventCache)
714-
s.asyncWG.Wait()
867+
}
868+
}
869+
870+
func (s *MVStates) stopAsyncWitness() {
871+
s.lock.Lock()
872+
defer s.lock.Unlock()
873+
if s.asyncWitnessRunning {
874+
close(s.witnessEventCh)
875+
s.asyncWitnessRunning = false
715876
}
716877
}
717878

@@ -916,12 +1077,12 @@ func (s *MVStates) querySlotReads(addr common.Address, slot common.Hash) *RWTxLi
9161077
// resolveDepsMapCacheByWrites must be executed in order
9171078
func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, writes []RWEventItem) {
9181079
for index >= len(s.txDepCache) {
919-
s.txDepCache = append(s.txDepCache, TxDep{})
1080+
s.txDepCache = append(s.txDepCache, types.TxDep{})
9201081
}
9211082
rwSet := s.rwSets[index]
9221083
// analysis dep, if the previous transaction is not executed/validated, re-analysis is required
9231084
if rwSet.excludedTx {
924-
s.txDepCache[index] = NewTxDep([]uint64{}, ExcludedTxFlag)
1085+
s.txDepCache[index] = types.NewTxDep([]uint64{}, types.ExcludedTxFlag)
9251086
return
9261087
}
9271088
depSlice := NewTxDepSlice(1)
@@ -1004,12 +1165,12 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, w
10041165
for _, tx := range removed {
10051166
depSlice.remove(tx)
10061167
}
1007-
s.txDepCache[index] = NewTxDep(depSlice.deps())
1168+
s.txDepCache[index] = types.NewTxDep(depSlice.deps())
10081169
}
10091170

10101171
// ResolveTxDAG generate TxDAG from RWSets
1011-
func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...TxDep) (TxDAG, error) {
1012-
s.stopAsyncRecorder()
1172+
func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...types.TxDep) (types.TxDAG, error) {
1173+
s.Stop()
10131174

10141175
s.lock.Lock()
10151176
defer s.lock.Unlock()
@@ -1020,10 +1181,10 @@ func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...TxDep) (TxDAG, error)
10201181
totalCnt := txCnt + len(extraTxDeps)
10211182
for i := 0; i < txCnt; i++ {
10221183
if s.rwSets[i].cannotGasFeeDelay {
1023-
return NewEmptyTxDAG(), nil
1184+
return types.NewEmptyTxDAG(), nil
10241185
}
10251186
}
1026-
txDAG := &PlainTxDAG{
1187+
txDAG := &types.PlainTxDAG{
10271188
TxDeps: s.txDepCache,
10281189
}
10291190
if len(extraTxDeps) > 0 {
@@ -1034,7 +1195,7 @@ func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...TxDep) (TxDAG, error)
10341195
continue
10351196
}
10361197
// if tx deps larger than half of txs, then convert with NonDependentRelFlag
1037-
txDAG.TxDeps[i].SetFlag(NonDependentRelFlag)
1198+
txDAG.TxDeps[i].SetFlag(types.NonDependentRelFlag)
10381199
nd := make([]uint64, 0, totalCnt-1-len(txDAG.TxDeps[i].TxIndexes))
10391200
for j := uint64(0); j < uint64(i); j++ {
10401201
if !slices.Contains(txDAG.TxDeps[i].TxIndexes, j) {
@@ -1047,6 +1208,15 @@ func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...TxDep) (TxDAG, error)
10471208
return txDAG, nil
10481209
}
10491210

1211+
// ResolveROTrieWitness generate ROTrieWitness from RWSets
1212+
func (s *MVStates) ResolveROTrieWitness() ([]map[string]struct{}, error) {
1213+
s.Stop()
1214+
1215+
s.lock.Lock()
1216+
defer s.lock.Unlock()
1217+
return s.witnessCache, nil
1218+
}
1219+
10501220
func (s *MVStates) FeeReceivers() []common.Address {
10511221
return s.gasFeeReceivers
10521222
}

0 commit comments

Comments
 (0)