Skip to content

Commit 475f08d

Browse files
authored
refactor: Decouple feeder gateway from Synchronizer (#2956)
1 parent 5c2e570 commit 475f08d

File tree

7 files changed

+174
-90
lines changed

7 files changed

+174
-90
lines changed

node/node.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ func New(cfg *Config, version string, logLevel *utils.LogLevel) (*Node, error) {
223223
WithLogger(log).
224224
WithTimeouts(timeouts, fixed).
225225
WithAPIKey(cfg.GatewayAPIKey)
226-
synchronizer = sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote, database)
226+
feederGatewayDataSource := sync.NewFeederGatewayDataSource(chain, adaptfeeder.New(client))
227+
synchronizer = sync.New(chain, feederGatewayDataSource, log, cfg.PendingPollInterval, dbIsRemote, database)
227228
synchronizer.WithPlugin(junoPlugin)
228229
chain.WithPendingBlockFn(synchronizer.PendingBlock)
229230
gatewayClient = gateway.NewClient(cfg.Network.GatewayURL, log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey)

node/node_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ func TestNetworkVerificationOnNonEmptyDB(t *testing.T) {
7373
require.NoError(t, err)
7474
chain := blockchain.New(database, &network)
7575
ctx, cancel := context.WithCancel(t.Context())
76-
syncer := sync.New(chain, adaptfeeder.New(feeder.NewTestClient(t, &network)), log, 0, false, database).WithListener(&sync.SelectiveListener{OnSyncStepDoneCb: func(op string, _ uint64, _ time.Duration) {
76+
dataSource := sync.NewFeederGatewayDataSource(chain, adaptfeeder.New(feeder.NewTestClient(t, &network)))
77+
syncer := sync.New(chain, dataSource, log, 0, false, database).WithListener(&sync.SelectiveListener{OnSyncStepDoneCb: func(op string, _ uint64, _ time.Duration) {
7778
// Stop the syncer after we successfully stored block.
7879
if op == sync.OpStore {
7980
cancel()

plugin/plugin_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ func TestPlugin(t *testing.T) {
3939
plugin.EXPECT().NewBlock(block, su, gomock.Any())
4040
}
4141
bc := blockchain.New(testDB, &utils.Integration)
42-
synchronizer := sync.New(bc, integGw, utils.NewNopZapLogger(), 0, false, nil).WithPlugin(plugin)
42+
dataSource := sync.NewFeederGatewayDataSource(bc, integGw)
43+
synchronizer := sync.New(bc, dataSource, utils.NewNopZapLogger(), 0, false, nil).WithPlugin(plugin)
4344

4445
ctx, cancel := context.WithTimeout(t.Context(), timeout)
4546
require.NoError(t, synchronizer.Run(ctx))
@@ -66,7 +67,8 @@ func TestPlugin(t *testing.T) {
6667
plugin.EXPECT().NewBlock(block, su, gomock.Any())
6768
}
6869

69-
synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false, nil).WithPlugin(plugin)
70+
dataSource := sync.NewFeederGatewayDataSource(bc, mainGw)
71+
synchronizer = sync.New(bc, dataSource, utils.NewNopZapLogger(), 0, false, nil).WithPlugin(plugin)
7072
ctx, cancel = context.WithTimeout(t.Context(), timeout)
7173
require.NoError(t, synchronizer.Run(ctx))
7274
cancel()

rpc/v8/storage_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,8 @@ func TestStorageProof_StorageRoots(t *testing.T) {
592592
log := utils.NewNopZapLogger()
593593
testDB := memory.New()
594594
bc := blockchain.New(testDB, &utils.Mainnet)
595-
synchronizer := sync.New(bc, gw, log, time.Duration(0), false, testDB)
595+
dataSource := sync.NewFeederGatewayDataSource(bc, gw)
596+
synchronizer := sync.New(bc, dataSource, log, time.Duration(0), false, testDB)
596597
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
597598

598599
require.NoError(t, synchronizer.Run(ctx))

sync/data_source.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/NethermindEth/juno/blockchain"
8+
"github.com/NethermindEth/juno/core"
9+
"github.com/NethermindEth/juno/core/felt"
10+
"github.com/NethermindEth/juno/db"
11+
"github.com/NethermindEth/juno/starknetdata"
12+
"github.com/NethermindEth/juno/utils"
13+
)
14+
15+
type CommittedBlock struct {
16+
Block *core.Block
17+
StateUpdate *core.StateUpdate
18+
NewClasses map[felt.Felt]core.Class
19+
}
20+
21+
type DataSource interface {
22+
BlockByNumber(ctx context.Context, blockNumber uint64) (CommittedBlock, error)
23+
BlockLatest(ctx context.Context) (*core.Block, error)
24+
BlockPending(ctx context.Context) (Pending, error)
25+
}
26+
27+
type feederGatewayDataSource struct {
28+
blockchain *blockchain.Blockchain
29+
starknetData starknetdata.StarknetData
30+
}
31+
32+
func NewFeederGatewayDataSource(blockchain *blockchain.Blockchain, starknetData starknetdata.StarknetData) DataSource {
33+
return &feederGatewayDataSource{
34+
blockchain: blockchain,
35+
starknetData: starknetData,
36+
}
37+
}
38+
39+
func (f *feederGatewayDataSource) BlockByNumber(ctx context.Context, blockNumber uint64) (CommittedBlock, error) {
40+
stateUpdate, block, err := f.starknetData.StateUpdateWithBlock(ctx, blockNumber)
41+
if err != nil {
42+
return CommittedBlock{}, err
43+
}
44+
45+
newClasses, err := f.fetchUnknownClasses(ctx, stateUpdate)
46+
if err != nil {
47+
return CommittedBlock{}, err
48+
}
49+
50+
return CommittedBlock{
51+
Block: block,
52+
StateUpdate: stateUpdate,
53+
NewClasses: newClasses,
54+
}, nil
55+
}
56+
57+
func (f *feederGatewayDataSource) BlockLatest(ctx context.Context) (*core.Block, error) {
58+
return f.starknetData.BlockLatest(ctx)
59+
}
60+
61+
func (f *feederGatewayDataSource) BlockPending(ctx context.Context) (Pending, error) {
62+
pendingStateUpdate, pendingBlock, err := f.starknetData.StateUpdatePendingWithBlock(ctx)
63+
if err != nil {
64+
return Pending{}, err
65+
}
66+
67+
newClasses, err := f.fetchUnknownClasses(ctx, pendingStateUpdate)
68+
if err != nil {
69+
return Pending{}, err
70+
}
71+
72+
return Pending{
73+
Block: pendingBlock,
74+
StateUpdate: pendingStateUpdate,
75+
NewClasses: newClasses,
76+
}, nil
77+
}
78+
79+
func (f *feederGatewayDataSource) fetchUnknownClasses(
80+
ctx context.Context,
81+
stateUpdate *core.StateUpdate,
82+
) (map[felt.Felt]core.Class, error) {
83+
state, closer, err := f.blockchain.HeadState()
84+
if err != nil {
85+
// if err is db.ErrKeyNotFound we are on an empty DB
86+
if !errors.Is(err, db.ErrKeyNotFound) {
87+
return nil, err
88+
}
89+
closer = func() error {
90+
return nil
91+
}
92+
}
93+
94+
newClasses := make(map[felt.Felt]core.Class)
95+
fetchIfNotFound := func(classHash *felt.Felt) error {
96+
if _, ok := newClasses[*classHash]; ok {
97+
return nil
98+
}
99+
100+
stateErr := db.ErrKeyNotFound
101+
if state != nil {
102+
_, stateErr = state.Class(classHash)
103+
}
104+
105+
if errors.Is(stateErr, db.ErrKeyNotFound) {
106+
class, fetchErr := f.starknetData.Class(ctx, classHash)
107+
if fetchErr == nil {
108+
newClasses[*classHash] = class
109+
}
110+
return fetchErr
111+
}
112+
return stateErr
113+
}
114+
115+
for _, classHash := range stateUpdate.StateDiff.DeployedContracts {
116+
if err = fetchIfNotFound(classHash); err != nil {
117+
return nil, utils.RunAndWrapOnError(closer, err)
118+
}
119+
}
120+
for _, classHash := range stateUpdate.StateDiff.DeclaredV0Classes {
121+
if err = fetchIfNotFound(classHash); err != nil {
122+
return nil, utils.RunAndWrapOnError(closer, err)
123+
}
124+
}
125+
for classHash := range stateUpdate.StateDiff.DeclaredV1Classes {
126+
if err = fetchIfNotFound(&classHash); err != nil {
127+
return nil, utils.RunAndWrapOnError(closer, err)
128+
}
129+
}
130+
131+
return newClasses, closer()
132+
}

sync/sync.go

Lines changed: 16 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/NethermindEth/juno/feed"
1616
junoplugin "github.com/NethermindEth/juno/plugin"
1717
"github.com/NethermindEth/juno/service"
18-
"github.com/NethermindEth/juno/starknetdata"
1918
"github.com/NethermindEth/juno/utils"
2019
"github.com/sourcegraph/conc/stream"
2120
)
@@ -117,7 +116,7 @@ type Synchronizer struct {
117116
blockchain *blockchain.Blockchain
118117
db db.KeyValueStore
119118
readOnlyBlockchain bool
120-
starknetData starknetdata.StarknetData
119+
dataSource DataSource
121120
startingBlockNumber *uint64
122121
highestBlockHeader atomic.Pointer[core.Header]
123122
newHeads *feed.Feed[*core.Block]
@@ -135,13 +134,18 @@ type Synchronizer struct {
135134
currReorg *ReorgBlockRange // If nil, no reorg is happening
136135
}
137136

138-
func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger,
139-
pendingPollInterval time.Duration, readOnlyBlockchain bool, database db.KeyValueStore,
137+
func New(
138+
bc *blockchain.Blockchain,
139+
dataSource DataSource,
140+
log utils.SimpleLogger,
141+
pendingPollInterval time.Duration,
142+
readOnlyBlockchain bool,
143+
database db.KeyValueStore,
140144
) *Synchronizer {
141145
s := &Synchronizer{
142146
blockchain: bc,
147+
dataSource: dataSource,
143148
db: database,
144-
starknetData: starkNetData,
145149
log: log,
146150
newHeads: feed.New[*core.Block](),
147151
reorgFeed: feed.New[*ReorgBlockRange](),
@@ -179,77 +183,20 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers
179183
case <-ctx.Done():
180184
return func() {}
181185
default:
182-
stateUpdate, block, err := s.starknetData.StateUpdateWithBlock(ctx, height)
183-
if err != nil {
184-
continue
185-
}
186-
187-
newClasses, err := s.fetchUnknownClasses(ctx, stateUpdate)
186+
committedBlock, err := s.dataSource.BlockByNumber(ctx, height)
188187
if err != nil {
189188
continue
190189
}
191190

192191
return func() {
193192
verifiers.Go(func() stream.Callback {
194-
return s.verifierTask(ctx, block, stateUpdate, newClasses, resetStreams)
193+
return s.verifierTask(ctx, committedBlock.Block, committedBlock.StateUpdate, committedBlock.NewClasses, resetStreams)
195194
})
196195
}
197196
}
198197
}
199198
}
200199

201-
func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *core.StateUpdate) (map[felt.Felt]core.Class, error) {
202-
state, closer, err := s.blockchain.HeadState()
203-
if err != nil {
204-
// if err is db.ErrKeyNotFound we are on an empty DB
205-
if !errors.Is(err, db.ErrKeyNotFound) {
206-
return nil, err
207-
}
208-
closer = func() error {
209-
return nil
210-
}
211-
}
212-
213-
newClasses := make(map[felt.Felt]core.Class)
214-
fetchIfNotFound := func(classHash *felt.Felt) error {
215-
if _, ok := newClasses[*classHash]; ok {
216-
return nil
217-
}
218-
219-
stateErr := db.ErrKeyNotFound
220-
if state != nil {
221-
_, stateErr = state.Class(classHash)
222-
}
223-
224-
if errors.Is(stateErr, db.ErrKeyNotFound) {
225-
class, fetchErr := s.starknetData.Class(ctx, classHash)
226-
if fetchErr == nil {
227-
newClasses[*classHash] = class
228-
}
229-
return fetchErr
230-
}
231-
return stateErr
232-
}
233-
234-
for _, classHash := range stateUpdate.StateDiff.DeployedContracts {
235-
if err = fetchIfNotFound(classHash); err != nil {
236-
return nil, utils.RunAndWrapOnError(closer, err)
237-
}
238-
}
239-
for _, classHash := range stateUpdate.StateDiff.DeclaredV0Classes {
240-
if err = fetchIfNotFound(classHash); err != nil {
241-
return nil, utils.RunAndWrapOnError(closer, err)
242-
}
243-
}
244-
for classHash := range stateUpdate.StateDiff.DeclaredV1Classes {
245-
if err = fetchIfNotFound(&classHash); err != nil {
246-
return nil, utils.RunAndWrapOnError(closer, err)
247-
}
248-
}
249-
250-
return newClasses, closer()
251-
}
252-
253200
func (s *Synchronizer) handlePluginRevertBlock() {
254201
fromBlock, err := s.blockchain.Head()
255202
if err != nil {
@@ -519,7 +466,7 @@ func (s *Synchronizer) pollLatest(ctx context.Context, sem chan struct{}) {
519466
defer func() {
520467
<-sem
521468
}()
522-
highestBlock, err := s.starknetData.BlockLatest(ctx)
469+
highestBlock, err := s.dataSource.BlockLatest(ctx)
523470
if err != nil {
524471
s.log.Warnw("Failed fetching latest block", "err", err)
525472
} else {
@@ -560,22 +507,14 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
560507
return nil
561508
}
562509

563-
pendingStateUpdate, pendingBlock, err := s.starknetData.StateUpdatePendingWithBlock(ctx)
510+
pending, err := s.dataSource.BlockPending(ctx)
564511
if err != nil {
565512
return err
566513
}
514+
pending.Block.Number = head.Number + 1
567515

568-
pendingBlock.Number = head.Number + 1
569-
newClasses, err := s.fetchUnknownClasses(ctx, pendingStateUpdate)
570-
if err != nil {
571-
return err
572-
}
573-
s.log.Debugw("Found pending block", "txns", pendingBlock.TransactionCount)
574-
return s.StorePending(&Pending{
575-
Block: pendingBlock,
576-
StateUpdate: pendingStateUpdate,
577-
NewClasses: newClasses,
578-
})
516+
s.log.Debugw("Found pending block", "txns", pending.Block.TransactionCount)
517+
return s.StorePending(&pending)
579518
}
580519

581520
func (s *Synchronizer) StartingBlockNumber() (uint64, error) {

0 commit comments

Comments
 (0)