Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions arbnode/consensus_execution_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{
}

func ConsensusExecutionSyncerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality data is pushed from consensus to execution")
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality and sync data is pushed from consensus to execution")
}

type ConsensusExecutionSyncer struct {
Expand All @@ -41,6 +41,7 @@ type ConsensusExecutionSyncer struct {
execClient execution.ExecutionClient
blockValidator *staker.BlockValidator
txStreamer *TransactionStreamer
syncMonitor *SyncMonitor
}

func NewConsensusExecutionSyncer(
Expand All @@ -49,19 +50,24 @@ func NewConsensusExecutionSyncer(
execClient execution.ExecutionClient,
blockValidator *staker.BlockValidator,
txStreamer *TransactionStreamer,
syncMonitor *SyncMonitor,
) *ConsensusExecutionSyncer {
return &ConsensusExecutionSyncer{
config: config,
inboxReader: inboxReader,
execClient: execClient,
blockValidator: blockValidator,
txStreamer: txStreamer,
syncMonitor: syncMonitor,
}
}

func (c *ConsensusExecutionSyncer) Start(ctx_in context.Context) {
c.StopWaiter.Start(ctx_in, c)
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
if c.inboxReader != nil {
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
}
c.CallIteratively(c.pushConsensusSyncDataToExecution)
}

func (c *ConsensusExecutionSyncer) getFinalityData(
Expand Down Expand Up @@ -140,3 +146,24 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx

return c.config().SyncInterval
}

func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context.Context) time.Duration {
syncData := &execution.ConsensusSyncData{
Synced: c.syncMonitor.Synced(),
SyncTargetMessageCount: c.syncMonitor.SyncTargetMessageCount(),
SyncProgressMap: c.syncMonitor.FullSyncProgressMap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncProgressMap should be nil or empty if Synced (prevents wasteful locking/etc)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in latest commit.

}

_, err := c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx)
if err != nil {
log.Error("Error pushing sync data from consensus to execution", "err", err)
} else {
log.Debug("Pushed sync data from consensus to execution",
"synced", syncData.Synced,
"syncTarget", syncData.SyncTargetMessageCount,
"syncProgressMap", syncData.SyncProgressMap,
)
}

return c.config().SyncInterval
}
4 changes: 4 additions & 0 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (w *execClientWrapper) SetFinalityData(
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch))
}
Expand Down
78 changes: 40 additions & 38 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,32 +978,46 @@ func getNodeParentChainReaderDisabled(
configFetcher ConfigFetcher,
blockMetadataFetcher *BlockMetadataFetcher,
) *Node {
// Create ConsensusExecutionSyncer even in L2-only mode to push sync data
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
return &configFetcher.Get().ConsensusExecutionSyncer
}
consensusExecutionSyncer := NewConsensusExecutionSyncer(
consensusExecutionSyncerConfigFetcher,
nil, // inboxReader
executionClient,
nil, // blockValidator
txStreamer,
syncMonitor,
)

return &Node{
ArbDB: arbDb,
Stack: stack,
ExecutionClient: executionClient,
ExecutionSequencer: executionSequencer,
ExecutionRecorder: executionRecorder,
L1Reader: nil,
TxStreamer: txStreamer,
DeployInfo: nil,
BlobReader: blobReader,
InboxReader: nil,
InboxTracker: nil,
DelayedSequencer: nil,
BatchPoster: nil,
MessagePruner: nil,
BlockValidator: nil,
StatelessBlockValidator: nil,
Staker: nil,
BroadcastServer: broadcastServer,
BroadcastClients: broadcastClients,
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
blockMetadataFetcher: blockMetadataFetcher,
ArbDB: arbDb,
Stack: stack,
ExecutionClient: executionClient,
ExecutionSequencer: executionSequencer,
ExecutionRecorder: executionRecorder,
L1Reader: nil,
TxStreamer: txStreamer,
DeployInfo: nil,
BlobReader: blobReader,
InboxReader: nil,
InboxTracker: nil,
DelayedSequencer: nil,
BatchPoster: nil,
MessagePruner: nil,
BlockValidator: nil,
StatelessBlockValidator: nil,
Staker: nil,
BroadcastServer: broadcastServer,
BroadcastClients: broadcastClients,
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
blockMetadataFetcher: blockMetadataFetcher,
ConsensusExecutionSyncer: consensusExecutionSyncer,
}
}

Expand Down Expand Up @@ -1123,7 +1137,7 @@ func createNodeImpl(
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
return &configFetcher.Get().ConsensusExecutionSyncer
}
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer)
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer, syncMonitor)

return &Node{
ArbDB: arbDb,
Expand Down Expand Up @@ -1513,18 +1527,6 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa
return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum))
}

func (n *Node) FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] {
return containers.NewReadyPromise(n.SyncMonitor.FullSyncProgressMap(), nil)
}

func (n *Node) Synced() containers.PromiseInterface[bool] {
return containers.NewReadyPromise(n.SyncMonitor.Synced(), nil)
}

func (n *Node) SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(n.SyncMonitor.SyncTargetMessageCount(), nil)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] {
err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata)
return containers.NewReadyPromise(struct{}{}, err)
Expand Down
5 changes: 5 additions & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ func (n *ExecutionNode) SetFinalityData(
return containers.NewReadyPromise(struct{}{}, err)
}

func (n *ExecutionNode) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
n.SyncMonitor.SetConsensusSyncData(syncData)
return containers.NewReadyPromise(struct{}{}, nil)
}

func (n *ExecutionNode) InitializeTimeboost(ctx context.Context, chainConfig *params.ChainConfig) error {
execNodeConfig := n.ConfigFetcher()
if execNodeConfig.Sequencer.Timeboost.Enable {
Expand Down
63 changes: 33 additions & 30 deletions execution/gethexec/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

flag "github.com/spf13/pflag"

Expand Down Expand Up @@ -34,6 +35,8 @@ type SyncMonitor struct {
config *SyncMonitorConfig
consensus execution.ConsensusInfo
exec *ExecutionEngine

consensusSyncData atomic.Pointer[execution.ConsensusSyncData]
}

func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonitor {
Expand All @@ -43,20 +46,25 @@ func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonit
}
}

// SetConsensusSyncData updates the sync data pushed from consensus
func (s *SyncMonitor) SetConsensusSyncData(syncData *execution.ConsensusSyncData) {
s.consensusSyncData.Store(syncData)
}

func (s *SyncMonitor) FullSyncProgressMap(ctx context.Context) map[string]interface{} {
res, err := s.consensus.FullSyncProgressMap().Await(ctx)
if err != nil {
res = make(map[string]interface{})
res["fullSyncProgressMapError"] = err
data := s.consensusSyncData.Load()
if data == nil {
return map[string]interface{}{"error": "no consensus sync data available"}
}

consensusSyncTarget, err := s.consensus.SyncTargetMessageCount().Await(ctx)
if err != nil {
res["consensusSyncTargetError"] = err
} else {
res["consensusSyncTarget"] = consensusSyncTarget
res := make(map[string]interface{})
for k, v := range data.SyncProgressMap {
res[k] = v
}

res["consensusSyncTarget"] = data.SyncTargetMessageCount

// Add execution-specific data
header, err := s.exec.getCurrentHeader()
if err != nil {
res["currentHeaderError"] = err
Expand All @@ -82,32 +90,27 @@ func (s *SyncMonitor) SyncProgressMap(ctx context.Context) map[string]interface{
}

func (s *SyncMonitor) Synced(ctx context.Context) bool {
synced, err := s.consensus.Synced().Await(ctx)
if err != nil {
log.Error("Error checking if consensus is synced", "err", err)
data := s.consensusSyncData.Load()
if data == nil {
return false
}
if synced {
built, err := s.exec.HeadMessageIndex()
if err != nil {
log.Error("Error getting head message index", "err", err)
return false
}

consensusSyncTarget, err := s.consensus.SyncTargetMessageCount().Await(ctx)
if err != nil {
log.Error("Error getting consensus sync target", "err", err)
return false
}
if consensusSyncTarget == 0 {
return false
}
if !data.Synced {
return false
}

if built+1 >= consensusSyncTarget {
return true
}
// Additional execution-side validation
built, err := s.exec.HeadMessageIndex()
if err != nil {
log.Error("Error getting head message index", "err", err)
return false
}
return false

if data.SyncTargetMessageCount == 0 {
return false
}

return built+1 >= data.SyncTargetMessageCount
}

func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) {
Expand Down
11 changes: 8 additions & 3 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type InboxBatch struct {
Found bool
}

// ConsensusSyncData contains sync status information pushed from consensus to execution
type ConsensusSyncData struct {
Synced bool
SyncTargetMessageCount arbutil.MessageIndex
SyncProgressMap map[string]interface{}
}

var ErrRetrySequencer = errors.New("please retry transaction")
var ErrSequencerInsertLockTaken = errors.New("insert lock taken")

Expand All @@ -45,6 +52,7 @@ type ExecutionClient interface {
MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) containers.PromiseInterface[uint64]
BlockNumberToMessageIndex(blockNum uint64) containers.PromiseInterface[arbutil.MessageIndex]
SetFinalityData(ctx context.Context, safeFinalityData *arbutil.FinalityData, finalizedFinalityData *arbutil.FinalityData, validatedFinalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}]
SetConsensusSyncData(ctx context.Context, syncData *ConsensusSyncData) containers.PromiseInterface[struct{}]
MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}]

TriggerMaintenance() containers.PromiseInterface[struct{}]
Expand Down Expand Up @@ -91,9 +99,6 @@ type BatchFetcher interface {
}

type ConsensusInfo interface {
Synced() containers.PromiseInterface[bool]
FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}]
SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex]
BlockMetadataAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[common.BlockMetadata]
}

Expand Down
1 change: 1 addition & 0 deletions system_tests/seq_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) {
builder.takeOwnership = false
builder.nodeConfig.SeqCoordinator.Enable = true
builder.nodeConfig.SeqCoordinator.RedisUrl = redisutil.CreateTestRedis(ctx, t)
builder.nodeConfig.ConsensusExecutionSyncer.SyncInterval = 10 * time.Millisecond

l2Info := builder.L2Info

Expand Down
Loading