Skip to content

Commit 47b32a4

Browse files
Merge branch 'consensus-pushes-sync-info' into fix-historical-feed-flood
2 parents 9baa2ad + b0c9f4f commit 47b32a4

File tree

10 files changed

+465
-78
lines changed

10 files changed

+465
-78
lines changed

arbnode/consensus_execution_syncer.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{
2828
SyncInterval: 1 * time.Second,
2929
}
3030

31+
// We don't define a Test config. For most tests we want the Syncer to behave
32+
// the same as in production.
33+
3134
func ConsensusExecutionSyncerConfigAddOptions(prefix string, f *pflag.FlagSet) {
32-
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality data is pushed from consensus to execution")
35+
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality and sync data is pushed from consensus to execution")
3336
}
3437

3538
type ConsensusExecutionSyncer struct {
@@ -41,6 +44,7 @@ type ConsensusExecutionSyncer struct {
4144
execClient execution.ExecutionClient
4245
blockValidator *staker.BlockValidator
4346
txStreamer *TransactionStreamer
47+
syncMonitor *SyncMonitor
4448
}
4549

4650
func NewConsensusExecutionSyncer(
@@ -49,19 +53,24 @@ func NewConsensusExecutionSyncer(
4953
execClient execution.ExecutionClient,
5054
blockValidator *staker.BlockValidator,
5155
txStreamer *TransactionStreamer,
56+
syncMonitor *SyncMonitor,
5257
) *ConsensusExecutionSyncer {
5358
return &ConsensusExecutionSyncer{
5459
config: config,
5560
inboxReader: inboxReader,
5661
execClient: execClient,
5762
blockValidator: blockValidator,
5863
txStreamer: txStreamer,
64+
syncMonitor: syncMonitor,
5965
}
6066
}
6167

6268
func (c *ConsensusExecutionSyncer) Start(ctx_in context.Context) {
6369
c.StopWaiter.Start(ctx_in, c)
64-
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
70+
if c.inboxReader != nil {
71+
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
72+
}
73+
c.CallIteratively(c.pushConsensusSyncDataToExecution)
6574
}
6675

6776
func (c *ConsensusExecutionSyncer) getFinalityData(
@@ -140,3 +149,40 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx
140149

141150
return c.config().SyncInterval
142151
}
152+
153+
func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context.Context) time.Duration {
154+
synced := c.syncMonitor.Synced()
155+
156+
maxMessageCount, err := c.syncMonitor.GetMaxMessageCount()
157+
if err != nil {
158+
log.Error("Error getting max message count", "err", err)
159+
return c.config().SyncInterval
160+
}
161+
162+
var syncProgressMap map[string]interface{}
163+
if !synced {
164+
// Only populate the full progress map when not synced (for debugging)
165+
syncProgressMap = c.syncMonitor.FullSyncProgressMap()
166+
}
167+
168+
syncData := &execution.ConsensusSyncData{
169+
Synced: synced,
170+
MaxMessageCount: maxMessageCount,
171+
SyncProgressMap: syncProgressMap,
172+
UpdatedAt: time.Now(),
173+
}
174+
175+
_, err = c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx)
176+
if err != nil {
177+
log.Error("Error pushing sync data from consensus to execution", "err", err)
178+
} else {
179+
log.Debug("Pushed sync data from consensus to execution",
180+
"synced", syncData.Synced,
181+
"maxMessageCount", syncData.MaxMessageCount,
182+
"updatedAt", syncData.UpdatedAt,
183+
"hasProgressMap", syncData.SyncProgressMap != nil,
184+
)
185+
}
186+
187+
return c.config().SyncInterval
188+
}

arbnode/inbox_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func (w *execClientWrapper) SetFinalityData(
8888
return containers.NewReadyPromise(struct{}{}, nil)
8989
}
9090

91+
func (w *execClientWrapper) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
92+
return containers.NewReadyPromise(struct{}{}, nil)
93+
}
94+
9195
func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] {
9296
return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch))
9397
}

arbnode/node.go

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -979,32 +979,46 @@ func getNodeParentChainReaderDisabled(
979979
configFetcher ConfigFetcher,
980980
blockMetadataFetcher *BlockMetadataFetcher,
981981
) *Node {
982+
// Create ConsensusExecutionSyncer even in L2-only mode to push sync data
983+
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
984+
return &configFetcher.Get().ConsensusExecutionSyncer
985+
}
986+
consensusExecutionSyncer := NewConsensusExecutionSyncer(
987+
consensusExecutionSyncerConfigFetcher,
988+
nil, // inboxReader
989+
executionClient,
990+
nil, // blockValidator
991+
txStreamer,
992+
syncMonitor,
993+
)
994+
982995
return &Node{
983-
ArbDB: arbDb,
984-
Stack: stack,
985-
ExecutionClient: executionClient,
986-
ExecutionSequencer: executionSequencer,
987-
ExecutionRecorder: executionRecorder,
988-
L1Reader: nil,
989-
TxStreamer: txStreamer,
990-
DeployInfo: nil,
991-
BlobReader: blobReader,
992-
InboxReader: nil,
993-
InboxTracker: nil,
994-
DelayedSequencer: nil,
995-
BatchPoster: nil,
996-
MessagePruner: nil,
997-
BlockValidator: nil,
998-
StatelessBlockValidator: nil,
999-
Staker: nil,
1000-
BroadcastServer: broadcastServer,
1001-
BroadcastClients: broadcastClients,
1002-
SeqCoordinator: coordinator,
1003-
MaintenanceRunner: maintenanceRunner,
1004-
SyncMonitor: syncMonitor,
1005-
configFetcher: configFetcher,
1006-
ctx: ctx,
1007-
blockMetadataFetcher: blockMetadataFetcher,
996+
ArbDB: arbDb,
997+
Stack: stack,
998+
ExecutionClient: executionClient,
999+
ExecutionSequencer: executionSequencer,
1000+
ExecutionRecorder: executionRecorder,
1001+
L1Reader: nil,
1002+
TxStreamer: txStreamer,
1003+
DeployInfo: nil,
1004+
BlobReader: blobReader,
1005+
InboxReader: nil,
1006+
InboxTracker: nil,
1007+
DelayedSequencer: nil,
1008+
BatchPoster: nil,
1009+
MessagePruner: nil,
1010+
BlockValidator: nil,
1011+
StatelessBlockValidator: nil,
1012+
Staker: nil,
1013+
BroadcastServer: broadcastServer,
1014+
BroadcastClients: broadcastClients,
1015+
SeqCoordinator: coordinator,
1016+
MaintenanceRunner: maintenanceRunner,
1017+
SyncMonitor: syncMonitor,
1018+
configFetcher: configFetcher,
1019+
ctx: ctx,
1020+
blockMetadataFetcher: blockMetadataFetcher,
1021+
ConsensusExecutionSyncer: consensusExecutionSyncer,
10081022
}
10091023
}
10101024

@@ -1124,7 +1138,7 @@ func createNodeImpl(
11241138
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
11251139
return &configFetcher.Get().ConsensusExecutionSyncer
11261140
}
1127-
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer)
1141+
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer, syncMonitor)
11281142

11291143
return &Node{
11301144
ArbDB: arbDb,
@@ -1514,18 +1528,6 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa
15141528
return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum))
15151529
}
15161530

1517-
func (n *Node) FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] {
1518-
return containers.NewReadyPromise(n.SyncMonitor.FullSyncProgressMap(), nil)
1519-
}
1520-
1521-
func (n *Node) Synced() containers.PromiseInterface[bool] {
1522-
return containers.NewReadyPromise(n.SyncMonitor.Synced(), nil)
1523-
}
1524-
1525-
func (n *Node) SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] {
1526-
return containers.NewReadyPromise(n.SyncMonitor.SyncTargetMessageCount(), nil)
1527-
}
1528-
15291531
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] {
15301532
err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata)
15311533
return containers.NewReadyPromise(struct{}{}, err)

arbnode/sync_monitor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message
8484
return 0, nil
8585
}
8686

87+
func (s *SyncMonitor) GetMaxMessageCount() (arbutil.MessageIndex, error) {
88+
return s.maxMessageCount()
89+
}
90+
8791
func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
8892
msgCount, err := s.txStreamer.GetMessageCount()
8993
if err != nil {
@@ -136,7 +140,14 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} {
136140
}
137141

138142
syncTarget := s.SyncTargetMessageCount()
139-
res["syncTargetMsgCount"] = syncTarget
143+
res["consensusSyncTargetMsgCount"] = syncTarget
144+
145+
maxMsgCount, err := s.maxMessageCount()
146+
if err != nil {
147+
res["maxMessageCountError"] = err.Error()
148+
return res
149+
}
150+
res["maxMessageCount"] = maxMsgCount
140151

141152
msgCount, err := s.txStreamer.GetMessageCount()
142153
if err != nil {

execution/gethexec/node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ var ConfigDefault = Config{
204204
TxPreChecker: DefaultTxPreCheckerConfig,
205205
Caching: DefaultCachingConfig,
206206
Forwarder: DefaultNodeForwarderConfig,
207+
SyncMonitor: DefaultSyncMonitorConfig,
207208

208209
EnablePrefetchBlock: true,
209210
StylusTarget: DefaultStylusTargetConfig,
@@ -570,6 +571,11 @@ func (n *ExecutionNode) SetFinalityData(
570571
return containers.NewReadyPromise(struct{}{}, err)
571572
}
572573

574+
func (n *ExecutionNode) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
575+
n.SyncMonitor.SetConsensusSyncData(syncData)
576+
return containers.NewReadyPromise(struct{}{}, nil)
577+
}
578+
573579
func (n *ExecutionNode) InitializeTimeboost(ctx context.Context, chainConfig *params.ChainConfig) error {
574580
execNodeConfig := n.ConfigFetcher()
575581
if execNodeConfig.Sequencer.Timeboost.Enable {

0 commit comments

Comments
 (0)