Skip to content

Commit 8f0605d

Browse files
committed
Tendermint: implement mechanism to sync to chain head
1 parent 475f08d commit 8f0605d

File tree

14 files changed

+551
-28
lines changed

14 files changed

+551
-28
lines changed

consensus/driver/driver.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct {
3232
scheduledTms map[types.Timeout]*time.Timer
3333
timeoutsCh chan types.Timeout
3434

35-
wg sync.WaitGroup
36-
quit chan struct{}
35+
wg sync.WaitGroup
36+
quit chan struct{}
37+
stopSyncCh chan<- struct{}
3738
}
3839

3940
func New[V types.Hashable[H], H types.Hash, A types.Addr](
@@ -44,6 +45,7 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
4445
listeners p2p.Listeners[V, H, A],
4546
broadcasters p2p.Broadcasters[V, H, A],
4647
getTimeout timeoutFn,
48+
stopSyncCh chan<- struct{},
4749
) *Driver[V, H, A] {
4850
return &Driver[V, H, A]{
4951
log: log,
@@ -56,6 +58,7 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
5658
scheduledTms: make(map[types.Timeout]*time.Timer),
5759
timeoutsCh: make(chan types.Timeout),
5860
quit: make(chan struct{}),
61+
stopSyncCh: stopSyncCh,
5962
}
6063
}
6164

@@ -131,6 +134,12 @@ func (d *Driver[V, H, A]) execute(actions []types.Action[V, H, A]) {
131134
if err := d.db.DeleteWALEntries(action.Height); err != nil {
132135
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)
133136
}
137+
case *types.StopSync:
138+
if d.stopSyncCh != nil {
139+
d.stopSyncCh <- struct{}{}
140+
close(d.stopSyncCh)
141+
d.stopSyncCh = nil // prevent panic
142+
}
134143
}
135144
}
136145
}

consensus/driver/driver_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ func TestDriver(t *testing.T) {
232232
mockListeners(proposalCh, prevoteCh, precommitCh),
233233
broadcasters,
234234
mockTimeoutFn,
235+
nil,
235236
)
236237

237238
inputTimeoutProposal := getRandTimeout(random, types.StepPropose)

consensus/integtest/integ_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func runTest(t *testing.T, cfg testConfig) {
8080
network.getListeners(nodeAddr),
8181
network.getBroadcasters(nodeAddr),
8282
getTimeoutFn(cfg),
83+
nil,
8384
)
8485

8586
driver.Start()

consensus/sync/init_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package sync
2+
3+
import (
4+
_ "github.com/NethermindEth/juno/encoder/registry"
5+
)

consensus/sync/sync.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
6+
"github.com/NethermindEth/juno/blockchain"
7+
"github.com/NethermindEth/juno/builder"
8+
"github.com/NethermindEth/juno/consensus/proposal"
9+
"github.com/NethermindEth/juno/consensus/types"
10+
"github.com/NethermindEth/juno/core"
11+
"github.com/NethermindEth/juno/core/felt"
12+
p2p "github.com/NethermindEth/juno/p2p"
13+
p2pSync "github.com/NethermindEth/juno/p2p/sync"
14+
"github.com/NethermindEth/juno/sync"
15+
)
16+
17+
type Sync[V types.Hashable[H], H types.Hash, A types.Addr] struct {
18+
syncService p2p.SyncToChannel
19+
driverProposalCh chan types.Proposal[V, H, A]
20+
driverPrecommitCh chan types.Precommit[H, A]
21+
// Todo: for now we can forge the precommit votes of our peers
22+
// In practice, this information needs to be exposed to peers.
23+
getPrecommits func(types.Height) []types.Precommit[H, A]
24+
stopSyncCh <-chan struct{}
25+
toValue func(*felt.Felt) V
26+
toHash func(*felt.Felt) H
27+
proposalStore *proposal.ProposalStore[H]
28+
}
29+
30+
func New[V types.Hashable[H], H types.Hash, A types.Addr](
31+
syncService p2p.SyncToChannel,
32+
driverProposalCh chan types.Proposal[V, H, A],
33+
driverPrecommitCh chan types.Precommit[H, A],
34+
getPrecommits func(types.Height) []types.Precommit[H, A],
35+
stopSyncCh <-chan struct{},
36+
toValue func(*felt.Felt) V,
37+
toHash func(*felt.Felt) H,
38+
proposalStore *proposal.ProposalStore[H],
39+
) Sync[V, H, A] {
40+
return Sync[V, H, A]{
41+
syncService: syncService,
42+
driverProposalCh: driverProposalCh,
43+
driverPrecommitCh: driverPrecommitCh,
44+
getPrecommits: getPrecommits,
45+
stopSyncCh: stopSyncCh,
46+
toValue: toValue,
47+
toHash: toHash,
48+
proposalStore: proposalStore,
49+
}
50+
}
51+
52+
func (s *Sync[V, H, A]) Run(ctx context.Context) {
53+
blockCh := make(chan p2pSync.BlockBody)
54+
55+
syncCtx, syncCancel := context.WithCancel(ctx)
56+
go func() {
57+
err := s.syncService.SyncToChannel(syncCtx, blockCh)
58+
if err != nil {
59+
syncCancel()
60+
return
61+
}
62+
}()
63+
64+
for {
65+
select {
66+
case <-ctx.Done():
67+
syncCancel()
68+
return
69+
case <-s.stopSyncCh:
70+
syncCancel()
71+
return
72+
case committedBlock := <-blockCh:
73+
msgHeader := types.MessageHeader[A]{
74+
Height: types.Height(committedBlock.Block.Number),
75+
Round: -1, // Todo: placeholder until round is placed in the spec
76+
}
77+
msgV := s.toValue(committedBlock.Block.Hash)
78+
msgH := s.toHash(committedBlock.Block.Hash)
79+
80+
precommits := s.getPrecommits(types.Height(committedBlock.Block.Number))
81+
for _, precommit := range precommits {
82+
s.driverPrecommitCh <- precommit
83+
}
84+
85+
msgHeader.Sender = committedBlock.Block.SequencerAddress.Bits()
86+
proposal := types.Proposal[V, H, A]{
87+
MessageHeader: msgHeader,
88+
ValidRound: -1,
89+
Value: &msgV,
90+
}
91+
s.driverProposalCh <- proposal
92+
93+
concatCommitments := core.ConcatCounts(
94+
committedBlock.Block.TransactionCount,
95+
committedBlock.Block.EventCount,
96+
committedBlock.StateUpdate.StateDiff.Length(),
97+
committedBlock.Block.L1DAMode,
98+
)
99+
buildResult := builder.BuildResult{
100+
Pending: &sync.Pending{
101+
Block: committedBlock.Block,
102+
StateUpdate: committedBlock.StateUpdate,
103+
NewClasses: committedBlock.NewClasses,
104+
},
105+
SimulateResult: &blockchain.SimulateResult{
106+
BlockCommitments: committedBlock.Commitments,
107+
ConcatCount: concatCommitments,
108+
},
109+
// Todo: this needs added to the spec.
110+
L2GasConsumed: 1,
111+
}
112+
s.proposalStore.Store(msgH, &buildResult)
113+
}
114+
}
115+
}

consensus/sync/sync_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package sync_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/NethermindEth/juno/consensus/driver"
8+
"github.com/NethermindEth/juno/consensus/mocks"
9+
"github.com/NethermindEth/juno/consensus/proposal"
10+
"github.com/NethermindEth/juno/consensus/starknet"
11+
consensusSync "github.com/NethermindEth/juno/consensus/sync"
12+
"github.com/NethermindEth/juno/consensus/types"
13+
"github.com/NethermindEth/juno/core"
14+
"github.com/NethermindEth/juno/core/felt"
15+
"github.com/NethermindEth/juno/p2p/sync"
16+
"github.com/NethermindEth/juno/utils"
17+
"github.com/stretchr/testify/require"
18+
"go.uber.org/mock/gomock"
19+
)
20+
21+
type mockSyncService struct {
22+
inCh chan sync.BlockBody // relays blocks to blockCh
23+
}
24+
25+
func newMockSyncService(inCh chan sync.BlockBody) mockSyncService {
26+
return mockSyncService{
27+
inCh: inCh,
28+
}
29+
}
30+
31+
func (m *mockSyncService) SyncToChannel(ctx context.Context, blockCh chan<- sync.BlockBody) error {
32+
for {
33+
select {
34+
case <-ctx.Done():
35+
return nil
36+
case committedBlock := <-m.inCh:
37+
blockCh <- committedBlock
38+
}
39+
}
40+
}
41+
42+
func (m *mockSyncService) recieveBlock(block sync.BlockBody) {
43+
m.inCh <- block
44+
}
45+
46+
func TestSync(t *testing.T) {
47+
ctrl := gomock.NewController(t)
48+
defer ctrl.Finish()
49+
50+
ctx, cancel := context.WithCancel(t.Context())
51+
defer cancel()
52+
53+
stateMachine := mocks.NewMockStateMachine[starknet.Value, starknet.Hash, starknet.Address](ctrl)
54+
stateMachine.EXPECT().ReplayWAL().AnyTimes().Return() // ignore WAL replay logic here
55+
stateMachine.EXPECT().ProcessStart(types.Round(0)).Return([]types.Action[starknet.Value, starknet.Hash, starknet.Address]{})
56+
stateMachine.EXPECT().ProcessPrecommit(gomock.Any()).Times(3).Return([]types.Action[starknet.Value, starknet.Hash, starknet.Address]{})
57+
stateMachine.EXPECT().ProcessProposal(gomock.Any()).Return(
58+
[]types.Action[starknet.Value, starknet.Hash, starknet.Address]{&types.StopSync{}}, // Pretend we caught the chain head. Commit action ignored here.
59+
)
60+
61+
proposalCh := make(chan starknet.Proposal)
62+
prevoteCh := make(chan starknet.Prevote)
63+
precommitCh := make(chan starknet.Precommit)
64+
broadcasters := mockBroadcasters()
65+
66+
stopSyncCh := make(chan struct{})
67+
68+
driver := driver.New(
69+
utils.NewNopZapLogger(),
70+
newTendermintDB(t),
71+
stateMachine,
72+
newMockBlockchain(t),
73+
mockListeners(proposalCh, prevoteCh, precommitCh),
74+
broadcasters,
75+
mockTimeoutFn,
76+
stopSyncCh,
77+
)
78+
driver.Start()
79+
80+
mockInCh := make(chan sync.BlockBody)
81+
mockSyncService := newMockSyncService(mockInCh)
82+
83+
proposalStore := proposal.ProposalStore[starknet.Hash]{}
84+
85+
consensusSyncService := consensusSync.New(&mockSyncService, proposalCh, precommitCh, getPrecommits, stopSyncCh, toValue, toHash, &proposalStore)
86+
87+
block0 := getCommittedBlock()
88+
block0Hash := toHash(block0.Block.Hash)
89+
go func() {
90+
mockSyncService.recieveBlock(block0)
91+
}()
92+
93+
consensusSyncService.Run(ctx) // Driver should trigger stopSyncCh and shut this service down
94+
require.NotEmpty(t, proposalStore.Get(block0Hash)) // Ensure the Driver sees the correct proposal
95+
_, stopSyncChIsOpen := <-stopSyncCh // Ensure the Driver closed this channel after catching up to the chain head
96+
require.False(t, stopSyncChIsOpen)
97+
}
98+
99+
func getCommittedBlock() sync.BlockBody {
100+
return sync.BlockBody{
101+
Block: &core.Block{
102+
Header: &core.Header{
103+
Hash: new(felt.Felt).SetUint64(1),
104+
TransactionCount: 2,
105+
EventCount: 3,
106+
SequencerAddress: new(felt.Felt).SetUint64(4),
107+
Number: 1,
108+
},
109+
},
110+
StateUpdate: &core.StateUpdate{
111+
StateDiff: &core.StateDiff{},
112+
},
113+
NewClasses: make(map[felt.Felt]core.Class),
114+
Commitments: &core.BlockCommitments{},
115+
}
116+
}
117+
118+
func toValue(in *felt.Felt) starknet.Value {
119+
return starknet.Value(*in)
120+
}
121+
122+
func toHash(in *felt.Felt) starknet.Hash {
123+
return starknet.Hash(*in)
124+
}
125+
126+
func getPrecommits(types.Height) []types.Precommit[starknet.Hash, starknet.Address] {
127+
return []types.Precommit[starknet.Hash, starknet.Address]{
128+
// We don't use the round since it's not present in the spec yet
129+
{
130+
MessageHeader: types.MessageHeader[starknet.Address]{
131+
Height: types.Height(1),
132+
Sender: starknet.Address(*new(felt.Felt).SetUint64(1)),
133+
},
134+
},
135+
{
136+
MessageHeader: types.MessageHeader[starknet.Address]{
137+
Height: types.Height(1),
138+
Sender: starknet.Address(*new(felt.Felt).SetUint64(2)),
139+
},
140+
},
141+
{
142+
MessageHeader: types.MessageHeader[starknet.Address]{
143+
Height: types.Height(1),
144+
Sender: starknet.Address(*new(felt.Felt).SetUint64(3)),
145+
},
146+
},
147+
}
148+
}

0 commit comments

Comments
 (0)