Skip to content

Commit 5c2e570

Browse files
authored
feat: Add scaffolding for proposer sending messages to P2P (#2856)
1 parent d96d62b commit 5c2e570

File tree

3 files changed

+357
-0
lines changed

3 files changed

+357
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package proposer
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/NethermindEth/juno/consensus/p2p/buffered"
8+
"github.com/NethermindEth/juno/consensus/proposal"
9+
"github.com/NethermindEth/juno/consensus/types"
10+
"github.com/NethermindEth/juno/utils"
11+
pubsub "github.com/libp2p/go-libp2p-pubsub"
12+
"github.com/sourcegraph/conc"
13+
)
14+
15+
type proposalBroadcaster[V types.Hashable[H], H types.Hash, A types.Addr] struct {
16+
log utils.Logger
17+
proposalAdapter ProposerAdapter[V, H, A]
18+
proposalStore *proposal.ProposalStore[H]
19+
broadcaster buffered.ProtoBroadcaster
20+
proposals chan types.Proposal[V, H, A]
21+
}
22+
23+
func NewProposalBroadcaster[V types.Hashable[H], H types.Hash, A types.Addr](
24+
log utils.Logger,
25+
proposalAdapter ProposerAdapter[V, H, A],
26+
proposalStore *proposal.ProposalStore[H],
27+
bufferSize int,
28+
retryInterval time.Duration,
29+
) proposalBroadcaster[V, H, A] {
30+
return proposalBroadcaster[V, H, A]{
31+
log: log,
32+
proposalAdapter: proposalAdapter,
33+
proposalStore: proposalStore,
34+
broadcaster: buffered.NewProtoBroadcaster(log, bufferSize, retryInterval),
35+
proposals: make(chan types.Proposal[V, H, A], bufferSize),
36+
}
37+
}
38+
39+
func (b *proposalBroadcaster[V, H, A]) Loop(ctx context.Context, topic *pubsub.Topic) {
40+
wg := conc.NewWaitGroup()
41+
wg.Go(func() {
42+
b.broadcaster.Loop(ctx, topic)
43+
})
44+
45+
wg.Go(func() {
46+
b.processLoop(ctx)
47+
})
48+
49+
wg.Wait()
50+
}
51+
52+
func (b *proposalBroadcaster[V, H, A]) processLoop(ctx context.Context) {
53+
for {
54+
select {
55+
case <-ctx.Done():
56+
return
57+
case proposal := <-b.proposals:
58+
proposalHash := (*proposal.Value).Hash()
59+
buildResult := b.proposalStore.Get(proposalHash)
60+
if buildResult == nil {
61+
b.log.Errorw("proposal not found", "proposal", proposalHash)
62+
continue
63+
}
64+
65+
dispatcher, err := newProposerDispatcher(b.proposalAdapter, &proposal, buildResult)
66+
if err != nil {
67+
b.log.Errorw("unable to build dispatcher", "error", err)
68+
continue
69+
}
70+
71+
for msg, err := range dispatcher.run() {
72+
if err != nil {
73+
b.log.Errorw("unable to generate proposal part", "error", err)
74+
return
75+
}
76+
b.broadcaster.Broadcast(msg)
77+
}
78+
}
79+
}
80+
}
81+
82+
func (b *proposalBroadcaster[V, H, A]) Broadcast(proposal types.Proposal[V, H, A]) {
83+
b.proposals <- proposal
84+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package proposer
2+
3+
import (
4+
"errors"
5+
6+
"github.com/NethermindEth/juno/builder"
7+
"github.com/NethermindEth/juno/consensus/starknet"
8+
"github.com/NethermindEth/juno/consensus/types"
9+
"github.com/NethermindEth/juno/core"
10+
"github.com/NethermindEth/juno/core/felt"
11+
)
12+
13+
type ProposerAdapter[V types.Hashable[H], H types.Hash, A types.Addr] interface {
14+
ProposalInit(*types.Proposal[V, H, A]) (types.ProposalInit, error)
15+
ProposalBlockInfo(*builder.BuildResult) (types.BlockInfo, error)
16+
ProposalTransactions(*builder.BuildResult) ([]types.Transaction, error)
17+
ProposalCommitment(*builder.BuildResult) (types.ProposalCommitment, error)
18+
ProposalFin(*types.Proposal[V, H, A]) (types.ProposalFin, error)
19+
}
20+
21+
type starknetProposerAdapter struct{}
22+
23+
func NewStarknetProposerAdapter() ProposerAdapter[starknet.Value, starknet.Hash, starknet.Address] {
24+
return &starknetProposerAdapter{}
25+
}
26+
27+
func (a *starknetProposerAdapter) ProposalInit(proposal *starknet.Proposal) (types.ProposalInit, error) {
28+
return types.ProposalInit{
29+
BlockNum: proposal.Height,
30+
Round: proposal.Round,
31+
ValidRound: proposal.ValidRound,
32+
Proposer: felt.Felt(proposal.Sender),
33+
}, nil
34+
}
35+
36+
// TODO: Implement this function properly
37+
func (a *starknetProposerAdapter) ProposalBlockInfo(buildResult *builder.BuildResult) (types.BlockInfo, error) {
38+
return types.BlockInfo{
39+
BlockNumber: buildResult.Pending.Block.Number,
40+
Builder: *buildResult.Pending.Block.SequencerAddress,
41+
Timestamp: buildResult.Pending.Block.Timestamp,
42+
L2GasPriceFRI: *buildResult.Pending.Block.L2GasPrice.PriceInFri,
43+
L1GasPriceWEI: *buildResult.Pending.Block.L1GasPriceSTRK,
44+
L1DataGasPriceWEI: *buildResult.Pending.Block.L1DataGasPrice.PriceInFri,
45+
EthToStrkRate: felt.One, // TODO: Double check if this is used
46+
L1DAMode: buildResult.Pending.Block.L1DAMode,
47+
}, nil
48+
}
49+
50+
// TODO: Implement this function properly
51+
func (a *starknetProposerAdapter) ProposalTransactions(buildResult *builder.BuildResult) ([]types.Transaction, error) {
52+
transactions := make([]types.Transaction, len(buildResult.Pending.Block.Transactions))
53+
for i := range buildResult.Pending.Block.Transactions {
54+
var class core.Class
55+
var paidFeeOnL1 *felt.Felt
56+
57+
switch tx := buildResult.Pending.Block.Transactions[i].(type) {
58+
case *core.DeclareTransaction:
59+
var ok bool
60+
if class, ok = buildResult.Pending.NewClasses[*tx.ClassHash]; !ok {
61+
return nil, errors.New("class not found")
62+
}
63+
case *core.L1HandlerTransaction:
64+
paidFeeOnL1 = felt.One.Clone()
65+
}
66+
67+
transactions[i] = types.Transaction{
68+
Transaction: buildResult.Pending.Block.Transactions[i],
69+
Class: class,
70+
PaidFeeOnL1: paidFeeOnL1,
71+
}
72+
}
73+
74+
return transactions, nil
75+
}
76+
77+
// TODO: Implement this function properly
78+
func (a *starknetProposerAdapter) ProposalCommitment(buildResult *builder.BuildResult) (types.ProposalCommitment, error) {
79+
return buildResult.ProposalCommitment()
80+
}
81+
82+
// TODO: Implement this function properly
83+
func (a *starknetProposerAdapter) ProposalFin(proposal *starknet.Proposal) (types.ProposalFin, error) {
84+
return types.ProposalFin(proposal.Value.Hash()), nil
85+
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package proposer
2+
3+
import (
4+
"iter"
5+
"slices"
6+
7+
"github.com/NethermindEth/juno/adapters/consensus2p2p"
8+
"github.com/NethermindEth/juno/builder"
9+
"github.com/NethermindEth/juno/consensus/types"
10+
"github.com/starknet-io/starknet-p2pspecs/p2p/proto/common"
11+
"github.com/starknet-io/starknet-p2pspecs/p2p/proto/consensus/consensus"
12+
"google.golang.org/protobuf/proto"
13+
)
14+
15+
const txBatchSize = 64 // TODO: make this configurable
16+
17+
type dispatcher[V types.Hashable[H], H types.Hash, A types.Addr] struct {
18+
adapter ProposerAdapter[V, H, A]
19+
proposal *types.Proposal[V, H, A]
20+
buildResult *builder.BuildResult
21+
streamID string
22+
seqNo uint64
23+
}
24+
25+
func newProposerDispatcher[V types.Hashable[H], H types.Hash, A types.Addr](
26+
adapter ProposerAdapter[V, H, A],
27+
proposal *types.Proposal[V, H, A],
28+
buildResult *builder.BuildResult,
29+
) (dispatcher[V, H, A], error) {
30+
streamIDStruct := &consensus.ConsensusStreamId{
31+
BlockNumber: uint64(proposal.Height),
32+
Round: uint32(proposal.Round),
33+
Nonce: 0,
34+
}
35+
36+
streamID, err := proto.Marshal(streamIDStruct)
37+
if err != nil {
38+
return dispatcher[V, H, A]{}, err // TODO: log error
39+
}
40+
41+
return dispatcher[V, H, A]{
42+
adapter: adapter,
43+
proposal: proposal,
44+
buildResult: buildResult,
45+
streamID: string(streamID),
46+
seqNo: 0,
47+
}, nil
48+
}
49+
50+
func (d *dispatcher[V, H, A]) run() iter.Seq2[*consensus.StreamMessage, error] {
51+
return func(yield func(*consensus.StreamMessage, error) bool) {
52+
if !yield(d.fromProposalInit()) {
53+
return
54+
}
55+
56+
if !yield(d.fromBlockInfo()) {
57+
return
58+
}
59+
60+
txStream, err := d.adapter.ProposalTransactions(d.buildResult)
61+
if err != nil {
62+
yield(nil, err)
63+
return
64+
}
65+
66+
for txBatch := range slices.Chunk(txStream, txBatchSize) {
67+
if !yield(d.fromTransactions(txBatch)) {
68+
return
69+
}
70+
}
71+
72+
if !yield(d.fromProposalCommitment()) {
73+
return
74+
}
75+
76+
if !yield(d.fromProposalFin()) {
77+
return
78+
}
79+
80+
if !yield(d.fromFin(), nil) {
81+
return
82+
}
83+
}
84+
}
85+
86+
func (d *dispatcher[V, H, A]) fromProposalInit() (*consensus.StreamMessage, error) {
87+
proposalInit, err := d.adapter.ProposalInit(d.proposal)
88+
if err != nil {
89+
return nil, err // TODO: log error
90+
}
91+
92+
p2pProposalInit := consensus2p2p.AdaptProposalInit(&proposalInit)
93+
94+
return d.sendProposalPart(&consensus.ProposalPart{
95+
Messages: &consensus.ProposalPart_Init{
96+
Init: &p2pProposalInit,
97+
},
98+
})
99+
}
100+
101+
func (d *dispatcher[V, H, A]) fromBlockInfo() (*consensus.StreamMessage, error) {
102+
blockInfo, err := d.adapter.ProposalBlockInfo(d.buildResult)
103+
if err != nil {
104+
return nil, err // TODO: log error
105+
}
106+
107+
p2pBlockInfo := consensus2p2p.AdaptBlockInfo(&blockInfo)
108+
109+
return d.sendProposalPart(&consensus.ProposalPart{
110+
Messages: &consensus.ProposalPart_BlockInfo{
111+
BlockInfo: &p2pBlockInfo,
112+
},
113+
})
114+
}
115+
116+
func (d *dispatcher[V, H, A]) fromTransactions(txs []types.Transaction) (*consensus.StreamMessage, error) {
117+
p2pTxBatch, err := consensus2p2p.AdaptProposalTransaction(txs)
118+
if err != nil {
119+
return nil, err // TODO: log error
120+
}
121+
122+
return d.sendProposalPart(&consensus.ProposalPart{
123+
Messages: &consensus.ProposalPart_Transactions{
124+
Transactions: &p2pTxBatch,
125+
},
126+
})
127+
}
128+
129+
func (d *dispatcher[V, H, A]) fromProposalCommitment() (*consensus.StreamMessage, error) {
130+
commitment, err := d.adapter.ProposalCommitment(d.buildResult)
131+
if err != nil {
132+
return nil, err // TODO: log error
133+
}
134+
135+
p2pCommitment := consensus2p2p.AdaptProposalCommitment(&commitment)
136+
137+
return d.sendProposalPart(&consensus.ProposalPart{
138+
Messages: &consensus.ProposalPart_Commitment{
139+
Commitment: &p2pCommitment,
140+
},
141+
})
142+
}
143+
144+
func (d *dispatcher[V, H, A]) fromProposalFin() (*consensus.StreamMessage, error) {
145+
fin, err := d.adapter.ProposalFin(d.proposal)
146+
if err != nil {
147+
return nil, err // TODO: log error
148+
}
149+
150+
p2pFin := consensus2p2p.AdaptProposalFin(&fin)
151+
152+
return d.sendProposalPart(&consensus.ProposalPart{
153+
Messages: &consensus.ProposalPart_Fin{
154+
Fin: &p2pFin,
155+
},
156+
})
157+
}
158+
159+
func (d *dispatcher[V, H, A]) sendProposalPart(proposal *consensus.ProposalPart) (*consensus.StreamMessage, error) {
160+
proposalBytes, err := proto.Marshal(proposal)
161+
if err != nil {
162+
return nil, err // TODO: log error
163+
}
164+
165+
return &consensus.StreamMessage{
166+
Message: &consensus.StreamMessage_Content{
167+
Content: proposalBytes,
168+
},
169+
StreamId: []byte(d.streamID),
170+
SequenceNumber: d.next(),
171+
}, nil
172+
}
173+
174+
func (d *dispatcher[V, H, A]) fromFin() *consensus.StreamMessage {
175+
return &consensus.StreamMessage{
176+
Message: &consensus.StreamMessage_Fin{
177+
Fin: &common.Fin{},
178+
},
179+
StreamId: []byte(d.streamID),
180+
SequenceNumber: d.next(),
181+
}
182+
}
183+
184+
func (d *dispatcher[V, H, A]) next() uint64 {
185+
seqNo := d.seqNo
186+
d.seqNo++
187+
return seqNo
188+
}

0 commit comments

Comments
 (0)