Skip to content

Commit ff2a8b6

Browse files
committed
feat: Implement P2P components
1 parent 81cb646 commit ff2a8b6

File tree

14 files changed

+717
-330
lines changed

14 files changed

+717
-330
lines changed

consensus/driver/driver.go

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package driver
22

33
import (
4-
"sync"
4+
"context"
55
"time"
66

77
"github.com/NethermindEth/juno/consensus/db"
88
"github.com/NethermindEth/juno/consensus/p2p"
9+
"github.com/NethermindEth/juno/consensus/proposer"
910
"github.com/NethermindEth/juno/consensus/tendermint"
1011
"github.com/NethermindEth/juno/consensus/types"
1112
"github.com/NethermindEth/juno/utils"
@@ -23,39 +24,34 @@ type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct {
2324
db db.TendermintDB[V, H, A]
2425
stateMachine tendermint.StateMachine[V, H, A]
2526
blockchain Blockchain[V, H]
27+
p2p p2p.P2P[V, H, A]
28+
proposer proposer.Proposer[V, H]
2629

2730
getTimeout timeoutFn
2831

29-
listeners p2p.Listeners[V, H, A]
30-
broadcasters p2p.Broadcasters[V, H, A]
31-
3232
scheduledTms map[types.Timeout]*time.Timer
3333
timeoutsCh chan types.Timeout
34-
35-
wg sync.WaitGroup
36-
quit chan struct{}
3734
}
3835

3936
func New[V types.Hashable[H], H types.Hash, A types.Addr](
4037
log utils.Logger,
4138
db db.TendermintDB[V, H, A],
4239
stateMachine tendermint.StateMachine[V, H, A],
4340
blockchain Blockchain[V, H],
44-
listeners p2p.Listeners[V, H, A],
45-
broadcasters p2p.Broadcasters[V, H, A],
41+
p2p p2p.P2P[V, H, A],
42+
proposer proposer.Proposer[V, H],
4643
getTimeout timeoutFn,
47-
) *Driver[V, H, A] {
48-
return &Driver[V, H, A]{
44+
) Driver[V, H, A] {
45+
return Driver[V, H, A]{
4946
log: log,
5047
db: db,
5148
stateMachine: stateMachine,
5249
blockchain: blockchain,
50+
p2p: p2p,
51+
proposer: proposer,
5352
getTimeout: getTimeout,
54-
listeners: listeners,
55-
broadcasters: broadcasters,
5653
scheduledTms: make(map[types.Timeout]*time.Timer),
5754
timeoutsCh: make(chan types.Timeout),
58-
quit: make(chan struct{}),
5955
}
6056
}
6157

@@ -64,60 +60,52 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
6460
// these messages and returns a set of actions to be executed by the Driver.
6561
// The Driver executes these actions (namely broadcasting messages
6662
// and triggering scheduled timeouts).
67-
func (d *Driver[V, H, A]) Start() {
63+
func (d *Driver[V, H, A]) Run(ctx context.Context) error {
6864
d.stateMachine.ReplayWAL()
6965

70-
d.wg.Add(1)
71-
go func() {
72-
defer d.wg.Done()
73-
74-
actions := d.stateMachine.ProcessStart(0)
75-
d.execute(actions)
66+
listeners := d.p2p.Listeners()
67+
broadcasters := d.p2p.Broadcasters()
7668

77-
// Todo: check message signature everytime a message is received.
78-
// For the time being it can be assumed the signature is correct.
69+
actions := d.stateMachine.ProcessStart(0)
70+
d.execute(ctx, broadcasters, actions)
7971

80-
for {
81-
select {
82-
case <-d.quit:
83-
return
84-
case tm := <-d.timeoutsCh:
85-
// Handling of timeouts is priorities over messages
86-
delete(d.scheduledTms, tm)
87-
actions = d.stateMachine.ProcessTimeout(tm)
88-
case p := <-d.listeners.ProposalListener.Listen():
89-
actions = d.stateMachine.ProcessProposal(p)
90-
case p := <-d.listeners.PrevoteListener.Listen():
91-
actions = d.stateMachine.ProcessPrevote(p)
92-
case p := <-d.listeners.PrecommitListener.Listen():
93-
actions = d.stateMachine.ProcessPrecommit(p)
72+
// Todo: check message signature everytime a message is received.
73+
// For the time being it can be assumed the signature is correct.
74+
for {
75+
select {
76+
case <-ctx.Done():
77+
for _, tm := range d.scheduledTms {
78+
tm.Stop()
9479
}
95-
d.execute(actions)
80+
return nil
81+
case tm := <-d.timeoutsCh:
82+
// Handling of timeouts is priorities over messages
83+
delete(d.scheduledTms, tm)
84+
actions = d.stateMachine.ProcessTimeout(tm)
85+
case p := <-listeners.ProposalListener.Listen():
86+
actions = d.stateMachine.ProcessProposal(p)
87+
case p := <-listeners.PrevoteListener.Listen():
88+
actions = d.stateMachine.ProcessPrevote(p)
89+
case p := <-listeners.PrecommitListener.Listen():
90+
actions = d.stateMachine.ProcessPrecommit(p)
9691
}
97-
}()
98-
}
99-
100-
func (d *Driver[V, H, A]) Stop() {
101-
close(d.quit)
102-
d.wg.Wait()
103-
for _, tm := range d.scheduledTms {
104-
tm.Stop()
92+
d.execute(ctx, broadcasters, actions)
10593
}
10694
}
10795

108-
func (d *Driver[V, H, A]) execute(actions []types.Action[V, H, A]) {
96+
func (d *Driver[V, H, A]) execute(ctx context.Context, broadcasters p2p.Broadcasters[V, H, A], actions []types.Action[V, H, A]) {
10997
for _, action := range actions {
11098
switch action := action.(type) {
11199
case *types.BroadcastProposal[V, H, A]:
112-
d.broadcasters.ProposalBroadcaster.Broadcast(types.Proposal[V, H, A](*action))
100+
broadcasters.ProposalBroadcaster.Broadcast(types.Proposal[V, H, A](*action))
113101
case *types.BroadcastPrevote[H, A]:
114-
d.broadcasters.PrevoteBroadcaster.Broadcast(types.Prevote[H, A](*action))
102+
broadcasters.PrevoteBroadcaster.Broadcast(types.Prevote[H, A](*action))
115103
case *types.BroadcastPrecommit[H, A]:
116-
d.broadcasters.PrecommitBroadcaster.Broadcast(types.Precommit[H, A](*action))
104+
broadcasters.PrecommitBroadcaster.Broadcast(types.Precommit[H, A](*action))
117105
case *types.ScheduleTimeout:
118106
d.scheduledTms[types.Timeout(*action)] = time.AfterFunc(d.getTimeout(action.Step, action.Round), func() {
119107
select {
120-
case <-d.quit:
108+
case <-ctx.Done():
121109
case d.timeoutsCh <- types.Timeout(*action):
122110
}
123111
})
@@ -126,7 +114,10 @@ func (d *Driver[V, H, A]) execute(actions []types.Action[V, H, A]) {
126114
d.log.Fatalf("failed to flush WAL during commit", "height", action.Height, "round", action.Round, "err", err)
127115
}
128116

117+
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
129118
d.blockchain.Commit(action.Height, *action.Value)
119+
d.proposer.OnCommit(ctx, action.Height, *action.Value)
120+
d.p2p.OnCommit(ctx, action.Height, *action.Value)
130121

131122
if err := d.db.DeleteWALEntries(action.Height); err != nil {
132123
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)

consensus/driver/driver_test.go

Lines changed: 24 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package driver_test
22

33
import (
4+
"context"
45
"math/rand"
5-
"sync"
66
"testing"
77
"time"
88

@@ -16,6 +16,7 @@ import (
1616
"github.com/NethermindEth/juno/core/hash"
1717
"github.com/NethermindEth/juno/db/pebble"
1818
"github.com/NethermindEth/juno/utils"
19+
"github.com/sourcegraph/conc"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
2122
"go.uber.org/mock/gomock"
@@ -40,70 +41,6 @@ type expectedBroadcast struct {
4041
precommits []starknet.Precommit
4142
}
4243

43-
type mockListener[M starknet.Message] struct {
44-
ch chan M
45-
}
46-
47-
func newMockListener[M starknet.Message](ch chan M) *mockListener[M] {
48-
return &mockListener[M]{
49-
ch: ch,
50-
}
51-
}
52-
53-
func (m *mockListener[M]) Listen() <-chan M {
54-
return m.ch
55-
}
56-
57-
type mockBroadcaster[M starknet.Message] struct {
58-
wg sync.WaitGroup
59-
mu sync.Mutex
60-
broadcastedMessages []M
61-
}
62-
63-
func (m *mockBroadcaster[M]) Broadcast(msg M) {
64-
m.mu.Lock()
65-
defer m.mu.Unlock()
66-
m.broadcastedMessages = append(m.broadcastedMessages, msg)
67-
m.wg.Done()
68-
}
69-
70-
type mockBlockchain struct {
71-
t *testing.T
72-
expectedCommit *starknet.Commit
73-
}
74-
75-
func (m *mockBlockchain) Commit(height types.Height, value starknet.Value) {
76-
require.Equal(m.t, m.expectedCommit.Value, &value)
77-
require.Equal(m.t, m.expectedCommit.Height, height)
78-
}
79-
80-
func newMockBlockchain(t *testing.T, expectedCommit *starknet.Commit) blockchain {
81-
return &mockBlockchain{
82-
t: t,
83-
expectedCommit: expectedCommit,
84-
}
85-
}
86-
87-
func mockListeners(
88-
proposalCh chan starknet.Proposal,
89-
prevoteCh chan starknet.Prevote,
90-
precommitCh chan starknet.Precommit,
91-
) listeners {
92-
return listeners{
93-
ProposalListener: newMockListener(proposalCh),
94-
PrevoteListener: newMockListener(prevoteCh),
95-
PrecommitListener: newMockListener(precommitCh),
96-
}
97-
}
98-
99-
func mockBroadcasters() broadcasters {
100-
return broadcasters{
101-
ProposalBroadcaster: &mockBroadcaster[starknet.Proposal]{},
102-
PrevoteBroadcaster: &mockBroadcaster[starknet.Prevote]{},
103-
PrecommitBroadcaster: &mockBroadcaster[starknet.Precommit]{},
104-
}
105-
}
106-
10744
func mockTimeoutFn(step types.Step, round types.Round) time.Duration {
10845
return 1 * time.Millisecond
10946
}
@@ -217,20 +154,20 @@ func TestDriver(t *testing.T) {
217154
proposalCh := make(chan starknet.Proposal)
218155
prevoteCh := make(chan starknet.Prevote)
219156
precommitCh := make(chan starknet.Precommit)
220-
broadcasters := mockBroadcasters()
221157

222158
stateMachine := mocks.NewMockStateMachine[starknet.Value, hash.Hash, starknet.Address](ctrl)
223159
stateMachine.EXPECT().ReplayWAL().AnyTimes().Return() // ignore WAL replay logic here
224160

225161
commitAction := starknet.Commit(getRandProposal(random))
162+
p2p := newMockP2P(proposalCh, prevoteCh, precommitCh)
226163

227164
driver := driver.New(
228165
utils.NewNopZapLogger(),
229166
newTendermintDB(t),
230167
stateMachine,
231168
newMockBlockchain(t, &commitAction),
232-
mockListeners(proposalCh, prevoteCh, precommitCh),
233-
broadcasters,
169+
p2p,
170+
newMockProposer(),
234171
mockTimeoutFn,
235172
)
236173

@@ -261,25 +198,30 @@ func TestDriver(t *testing.T) {
261198
stateMachine.EXPECT().ProcessTimeout(inputTimeoutPrevote).Return(generateAndRegisterRandomActions(random, expectedBroadcast))
262199
stateMachine.EXPECT().ProcessTimeout(inputTimeoutPrecommit).Return(generateAndRegisterRandomActions(random, expectedBroadcast))
263200

264-
increaseBroadcasterWaitGroup(expectedBroadcast.proposals, broadcasters.ProposalBroadcaster)
265-
increaseBroadcasterWaitGroup(expectedBroadcast.prevotes, broadcasters.PrevoteBroadcaster)
266-
increaseBroadcasterWaitGroup(expectedBroadcast.precommits, broadcasters.PrecommitBroadcaster)
201+
increaseBroadcasterWaitGroup(expectedBroadcast.proposals, p2p.Broadcasters().ProposalBroadcaster)
202+
increaseBroadcasterWaitGroup(expectedBroadcast.prevotes, p2p.Broadcasters().PrevoteBroadcaster)
203+
increaseBroadcasterWaitGroup(expectedBroadcast.precommits, p2p.Broadcasters().PrecommitBroadcaster)
267204

268-
driver.Start()
205+
ctx, cancel := context.WithCancel(t.Context())
269206

270-
go func() {
207+
wg := conc.NewWaitGroup()
208+
wg.Go(func() {
209+
require.NoError(t, driver.Run(ctx))
210+
})
211+
wg.Go(func() {
271212
proposalCh <- inputProposalMsg
272-
}()
273-
go func() {
213+
})
214+
wg.Go(func() {
274215
prevoteCh <- inputPrevoteMsg
275-
}()
276-
go func() {
216+
})
217+
wg.Go(func() {
277218
precommitCh <- inputPrecommitMsg
278-
}()
219+
})
220+
t.Cleanup(wg.Wait)
279221

280-
waitAndAssertBroadcaster(t, expectedBroadcast.proposals, broadcasters.ProposalBroadcaster)
281-
waitAndAssertBroadcaster(t, expectedBroadcast.prevotes, broadcasters.PrevoteBroadcaster)
282-
waitAndAssertBroadcaster(t, expectedBroadcast.precommits, broadcasters.PrecommitBroadcaster)
222+
waitAndAssertBroadcaster(t, expectedBroadcast.proposals, p2p.Broadcasters().ProposalBroadcaster)
223+
waitAndAssertBroadcaster(t, expectedBroadcast.prevotes, p2p.Broadcasters().PrevoteBroadcaster)
224+
waitAndAssertBroadcaster(t, expectedBroadcast.precommits, p2p.Broadcasters().PrecommitBroadcaster)
283225

284-
driver.Stop()
226+
cancel()
285227
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package driver_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/NethermindEth/juno/consensus/starknet"
7+
"github.com/NethermindEth/juno/consensus/types"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
type mockBlockchain struct {
12+
t *testing.T
13+
expectedCommit *starknet.Commit
14+
}
15+
16+
func (m *mockBlockchain) Commit(height types.Height, value starknet.Value) {
17+
require.Equal(m.t, m.expectedCommit.Value, &value)
18+
require.Equal(m.t, m.expectedCommit.Height, height)
19+
}
20+
21+
func newMockBlockchain(t *testing.T, expectedCommit *starknet.Commit) blockchain {
22+
return &mockBlockchain{
23+
t: t,
24+
expectedCommit: expectedCommit,
25+
}
26+
}

0 commit comments

Comments
 (0)