Skip to content

Commit b9acd62

Browse files
AlexeyAkhunovAlexey Sharp
andauthored
Embed header/bodies download into a mini-staged sync (#1498)
* Embed header/bodies download into a mini-staged sync * Move code around to prevent import cycles * Include new files * Fix lint * Fix lint * Fix lint Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
1 parent 6e56097 commit b9acd62

File tree

13 files changed

+299
-126
lines changed

13 files changed

+299
-126
lines changed

cmd/headers/download/downloader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_sentry.I
530530
}
531531

532532
func (cs *ControlServerImpl) blockBodies(inreq *proto_sentry.InboundMessage) error {
533-
var request eth.BlockBodiesData
533+
var request []*types.Body
534534
if err := rlp.DecodeBytes(inreq.Data, &request); err != nil {
535535
return fmt.Errorf("decode BlockBodies: %v", err)
536536
}

cmd/headers/download/sentry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,9 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
532532
rwRaw, _ := ss.peerRwMap.Load(peerID)
533533
rw, _ := rwRaw.(p2p.MsgReadWriter)
534534
if rw == nil {
535+
ss.peerHeightMap.Delete(peerID)
536+
ss.peerTimeMap.Delete(peerID)
537+
ss.peerRwMap.Delete(peerID)
535538
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock find rw for peer %s", peerID)
536539
}
537540
var msgcode uint64

eth/stagedsync/all_stages.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
2626
ID: stages.BlockHashes,
2727
Description: "Write block hashes",
2828
ExecFunc: func(s *StageState, u Unwinder) error {
29-
return SpawnBlockHashStage(s, world.db, world.tmpdir, world.QuitCh)
29+
return SpawnBlockHashStage(s, world.db, world.TmpDir, world.QuitCh)
3030
},
3131
UnwindFunc: func(u *UnwindState, s *StageState) error {
3232
return u.Done(world.db)
@@ -71,7 +71,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
7171
ReadChLen: 4,
7272
Now: time.Now(),
7373
}
74-
return SpawnRecoverSendersStage(cfg, s, world.TX, world.chainConfig, 0, world.tmpdir, world.QuitCh)
74+
return SpawnRecoverSendersStage(cfg, s, world.TX, world.chainConfig, 0, world.TmpDir, world.QuitCh)
7575
},
7676
UnwindFunc: func(u *UnwindState, s *StageState) error {
7777
return UnwindSendersStage(u, s, world.TX)
@@ -112,10 +112,10 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
112112
ID: stages.HashState,
113113
Description: "Hash the key in the state",
114114
ExecFunc: func(s *StageState, u Unwinder) error {
115-
return SpawnHashStateStage(s, world.TX, world.tmpdir, world.QuitCh)
115+
return SpawnHashStateStage(s, world.TX, world.TmpDir, world.QuitCh)
116116
},
117117
UnwindFunc: func(u *UnwindState, s *StageState) error {
118-
return UnwindHashStateStage(u, s, world.TX, world.tmpdir, world.QuitCh)
118+
return UnwindHashStateStage(u, s, world.TX, world.TmpDir, world.QuitCh)
119119
},
120120
}
121121
},
@@ -160,10 +160,10 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
160160
}
161161
c.Close()
162162
*/
163-
return SpawnIntermediateHashesStage(s, world.TX, checkRoot /* checkRoot */, world.tmpdir, world.QuitCh)
163+
return SpawnIntermediateHashesStage(s, world.TX, checkRoot /* checkRoot */, world.TmpDir, world.QuitCh)
164164
},
165165
UnwindFunc: func(u *UnwindState, s *StageState) error {
166-
return UnwindIntermediateHashesStage(u, s, world.TX, world.tmpdir, world.QuitCh)
166+
return UnwindIntermediateHashesStage(u, s, world.TX, world.TmpDir, world.QuitCh)
167167
},
168168
}
169169
},
@@ -177,7 +177,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
177177
Disabled: !world.storageMode.History,
178178
DisabledDescription: "Enable by adding `h` to --storage-mode",
179179
ExecFunc: func(s *StageState, u Unwinder) error {
180-
return SpawnAccountHistoryIndex(s, world.TX, world.tmpdir, world.QuitCh)
180+
return SpawnAccountHistoryIndex(s, world.TX, world.TmpDir, world.QuitCh)
181181
},
182182
UnwindFunc: func(u *UnwindState, s *StageState) error {
183183
return UnwindAccountHistoryIndex(u, s, world.TX, world.QuitCh)
@@ -194,7 +194,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
194194
Disabled: !world.storageMode.History,
195195
DisabledDescription: "Enable by adding `h` to --storage-mode",
196196
ExecFunc: func(s *StageState, u Unwinder) error {
197-
return SpawnStorageHistoryIndex(s, world.TX, world.tmpdir, world.QuitCh)
197+
return SpawnStorageHistoryIndex(s, world.TX, world.TmpDir, world.QuitCh)
198198
},
199199
UnwindFunc: func(u *UnwindState, s *StageState) error {
200200
return UnwindStorageHistoryIndex(u, s, world.TX, world.QuitCh)
@@ -211,7 +211,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
211211
Disabled: !world.storageMode.Receipts,
212212
DisabledDescription: "Enable by adding `r` to --storage-mode",
213213
ExecFunc: func(s *StageState, u Unwinder) error {
214-
return SpawnLogIndex(s, world.TX, world.tmpdir, world.QuitCh)
214+
return SpawnLogIndex(s, world.TX, world.TmpDir, world.QuitCh)
215215
},
216216
UnwindFunc: func(u *UnwindState, s *StageState) error {
217217
return UnwindLogIndex(u, s, world.TX, world.QuitCh)
@@ -228,7 +228,7 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
228228
Disabled: !world.storageMode.CallTraces,
229229
DisabledDescription: "Work In Progress",
230230
ExecFunc: func(s *StageState, u Unwinder) error {
231-
return SpawnCallTraces(s, world.TX, world.chainConfig, world.chainContext, world.tmpdir, world.QuitCh,
231+
return SpawnCallTraces(s, world.TX, world.chainConfig, world.chainContext, world.TmpDir, world.QuitCh,
232232
CallTracesStageParams{
233233
CacheSize: world.cacheSize,
234234
BatchSize: world.batchSize,
@@ -253,10 +253,10 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
253253
Disabled: !world.storageMode.TxIndex,
254254
DisabledDescription: "Enable by adding `t` to --storage-mode",
255255
ExecFunc: func(s *StageState, u Unwinder) error {
256-
return SpawnTxLookup(s, world.TX, world.tmpdir, world.QuitCh)
256+
return SpawnTxLookup(s, world.TX, world.TmpDir, world.QuitCh)
257257
},
258258
UnwindFunc: func(u *UnwindState, s *StageState) error {
259-
return UnwindTxLookup(u, s, world.TX, world.tmpdir, world.QuitCh)
259+
return UnwindTxLookup(u, s, world.TX, world.TmpDir, world.QuitCh)
260260
},
261261
}
262262
},

eth/stagedsync/stage.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ type StageState struct {
4040
BlockNumber uint64
4141
}
4242

43+
func (s *StageState) LogPrefix() string {
44+
return s.state.LogPrefix()
45+
}
46+
4347
// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
4448
func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error {
4549
return stages.SaveStageProgress(db, s.Stage, newBlockNum)

eth/stagedsync/stage_blockhashes.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package stagedsync
22

33
import (
4+
"context"
45
"encoding/binary"
56
"fmt"
67

@@ -21,11 +22,24 @@ func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
2122
}
2223

2324
func SpawnBlockHashStage(s *StageState, db ethdb.Database, tmpdir string, quit <-chan struct{}) error {
24-
headNumber, err := stages.GetStageProgress(db, stages.Headers)
25+
var tx ethdb.DbWithPendingMutations
26+
var useExternalTx bool
27+
if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil {
28+
tx = db.(ethdb.DbWithPendingMutations)
29+
useExternalTx = true
30+
} else {
31+
var err error
32+
tx, err = db.Begin(context.Background(), ethdb.RW)
33+
if err != nil {
34+
return err
35+
}
36+
defer tx.Rollback()
37+
}
38+
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
2539
if err != nil {
2640
return fmt.Errorf("getting headers progress: %w", err)
2741
}
28-
headHash := rawdb.ReadHeaderByNumber(db, headNumber).Hash()
42+
headHash := rawdb.ReadHeaderByNumber(tx, headNumber).Hash()
2943
if s.BlockNumber == headNumber {
3044
s.Done()
3145
return nil
@@ -38,7 +52,7 @@ func SpawnBlockHashStage(s *StageState, db ethdb.Database, tmpdir string, quit <
3852
logPrefix := s.state.LogPrefix()
3953
if err := etl.Transform(
4054
logPrefix,
41-
db,
55+
tx,
4256
dbutils.HeaderPrefix,
4357
dbutils.HeaderNumberPrefix,
4458
tmpdir,
@@ -52,5 +66,13 @@ func SpawnBlockHashStage(s *StageState, db ethdb.Database, tmpdir string, quit <
5266
); err != nil {
5367
return err
5468
}
55-
return s.DoneAndUpdate(db, headNumber)
69+
if err := s.DoneAndUpdate(tx, headNumber); err != nil {
70+
return err
71+
}
72+
if !useExternalTx {
73+
if _, err := tx.Commit(); err != nil {
74+
return err
75+
}
76+
}
77+
return nil
5678
}
Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package bodydownload
1+
package stagedsync
22

33
import (
44
"context"
@@ -12,38 +12,21 @@ import (
1212
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
1313
"github.com/ledgerwatch/turbo-geth/ethdb"
1414
"github.com/ledgerwatch/turbo-geth/log"
15+
"github.com/ledgerwatch/turbo-geth/turbo/stages/bodydownload"
1516
)
1617

17-
const (
18-
logInterval = 30 * time.Second
19-
)
20-
21-
// Forward progresses Bodies stage in the forward direction
22-
func Forward(
23-
logPrefix string,
18+
// BodiesForward progresses Bodies stage in the forward direction
19+
func BodiesForward(
20+
s *StageState,
2421
ctx context.Context,
2522
db ethdb.Database,
26-
bd *BodyDownload,
27-
bodyReqSend func(context.Context, *BodyRequest) []byte,
23+
bd *bodydownload.BodyDownload,
24+
bodyReqSend func(context.Context, *bodydownload.BodyRequest) []byte,
2825
penalise func(context.Context, []byte),
2926
updateHead func(ctx context.Context, head uint64, hash common.Hash, td *big.Int),
3027
wakeUpChan chan struct{}, timeout int) error {
31-
// This will update bd.maxProgress
32-
if _, _, _, err := bd.UpdateFromDb(db); err != nil {
33-
return err
34-
}
35-
var headerProgress, bodyProgress uint64
36-
var err error
37-
headerProgress, err = stages.GetStageProgress(db, stages.Headers)
38-
if err != nil {
39-
return err
40-
}
41-
bodyProgress, err = stages.GetStageProgress(db, stages.Bodies)
42-
if err != nil {
43-
return err
44-
}
45-
log.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
4628
var tx ethdb.DbWithPendingMutations
29+
var err error
4730
var useExternalTx bool
4831
if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil {
4932
tx = db.(ethdb.DbWithPendingMutations)
@@ -55,6 +38,21 @@ func Forward(
5538
}
5639
defer tx.Rollback()
5740
}
41+
// This will update bd.maxProgress
42+
if _, _, _, err = bd.UpdateFromDb(tx); err != nil {
43+
return err
44+
}
45+
var headerProgress, bodyProgress uint64
46+
headerProgress, err = stages.GetStageProgress(tx, stages.Headers)
47+
if err != nil {
48+
return err
49+
}
50+
bodyProgress, err = stages.GetStageProgress(tx, stages.Bodies)
51+
if err != nil {
52+
return err
53+
}
54+
logPrefix := s.LogPrefix()
55+
log.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
5856
batch := tx.NewBatch()
5957
defer batch.Rollback()
6058
logEvery := time.NewTicker(logInterval)
@@ -63,7 +61,7 @@ func Forward(
6361
var prevWastedCount float64 = 0
6462
timer := time.NewTimer(1 * time.Second) // Check periodically even in the abseence of incoming messages
6563
var blockNum uint64
66-
var req *BodyRequest
64+
var req *bodydownload.BodyRequest
6765
var peer []byte
6866
stopped := false
6967
var headHash common.Hash
@@ -127,7 +125,7 @@ func Forward(
127125
stopped = true
128126
case <-logEvery.C:
129127
deliveredCount, wastedCount := bd.DeliveryCounts()
130-
logProgress(logPrefix, bodyProgress, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount, batch)
128+
logProgressBodies(logPrefix, bodyProgress, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount, batch)
131129
prevDeliveredCount = deliveredCount
132130
prevWastedCount = wastedCount
133131
bd.PrintPeerMap()
@@ -140,23 +138,26 @@ func Forward(
140138
if _, err := batch.Commit(); err != nil {
141139
return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err)
142140
}
143-
if !useExternalTx {
144-
if _, err := tx.Commit(); err != nil {
145-
return err
146-
}
147-
}
148141
if headSet {
149-
if headTd, err := rawdb.ReadTd(db, headHash, bodyProgress); err == nil {
142+
if headTd, err := rawdb.ReadTd(tx, headHash, bodyProgress); err == nil {
150143
updateHead(ctx, bodyProgress, headHash, headTd)
151144
} else {
152145
log.Error("Failed to get total difficulty", "hash", headHash, "height", bodyProgress, "error", err)
153146
}
154147
}
148+
if err := s.DoneAndUpdate(tx, bodyProgress); err != nil {
149+
return err
150+
}
151+
if !useExternalTx {
152+
if _, err := tx.Commit(); err != nil {
153+
return err
154+
}
155+
}
155156
log.Info("Processed", "highest", bodyProgress)
156157
return nil
157158
}
158159

159-
func logProgress(logPrefix string, committed uint64, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount float64, batch ethdb.DbWithPendingMutations) {
160+
func logProgressBodies(logPrefix string, committed uint64, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount float64, batch ethdb.DbWithPendingMutations) {
160161
speed := (deliveredCount - prevDeliveredCount) / float64(logInterval/time.Second)
161162
wastedSpeed := (wastedCount - prevWastedCount) / float64(logInterval/time.Second)
162163
var m runtime.MemStats

0 commit comments

Comments
 (0)