Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,32 @@ type TransactionStreamer struct {
}

type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"`
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"`
ShutdownOnBlockhashMismatch bool `koanf:"shutdown-on-blockhash-mismatch"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig

var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
ShutdownOnBlockhashMismatch: false,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
ShutdownOnBlockhashMismatch: false,
}

func TransactionStreamerConfigAddOptions(prefix string, f *pflag.FlagSet) {
Expand All @@ -105,6 +108,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block")
f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number to start saving blockmetadata, 0 to disable")
f.Bool(prefix+".shutdown-on-blockhash-mismatch", DefaultTransactionStreamerConfig.ShutdownOnBlockhashMismatch, "if set the node gracefully shuts down upon detecting mismatch in feed and locally computed blockhash. This is turned off by default")
}

func NewTransactionStreamer(
Expand Down Expand Up @@ -1313,6 +1317,9 @@ func (s *TransactionStreamer) checkResult(msgIdx arbutil.MessageIndex, msgResult
log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgIdx", msgIdx, "err", err)
}
}
if s.config().ShutdownOnBlockhashMismatch {
s.fatalErrChan <- fmt.Errorf("%s: msgIdx: %d, expectedHash: %v actualHash: %v", BlockHashMismatchLogMsg, msgIdx, msgAndBlockInfo.BlockHash, msgResult.BlockHash)
}
}
}

Expand Down
Loading