diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index daff5ed06b..818937882e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -1207,17 +1207,16 @@ func (s *TransactionStreamer) checkResult(pos arbutil.MessageIndex, msgResult *e "actual", msgResult.BlockHash, ) // Try deleting the existing blockMetadata for this block in arbDB and set it as missing - if msgAndBlockInfo.BlockMetadata != nil { + if msgAndBlockInfo.BlockMetadata != nil && + s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { batch := s.db.NewBatch() if err := batch.Delete(dbKey(blockMetadataInputFeedPrefix, uint64(pos))); err != nil { log.Error("error deleting blockMetadata of block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) return } - if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { - if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil { - log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) - return - } + if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil { + log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + return } if err := batch.Write(); err != nil { log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) @@ -1320,7 +1319,72 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } +func (s *TransactionStreamer) backfillTrackersForMissingBlockMetadata(ctx context.Context) { + if s.trackBlockMetadataFrom == 0 { + return + } + msgCount, err := s.GetMessageCount() + if err != nil { + log.Error("Error getting message count from arbDB", "err", err) + return + } + if s.trackBlockMetadataFrom >= msgCount { + return // We dont need to back fill if trackBlockMetadataFrom is in the future + } + + wasKeyFound := func(pos uint64) bool { + searchWithPrefix := func(prefix []byte) bool { + key := dbKey(prefix, pos) + _, err := s.db.Get(key) + if err == nil { + return true + } + if !dbutil.IsErrNotFound(err) { + log.Error("Error reading key in arbDB while back-filling trackers for missing blockMetadata", "key", key, "err", err) + } + return false + } + return searchWithPrefix(blockMetadataInputFeedPrefix) || searchWithPrefix(missingBlockMetadataInputFeedPrefix) + } + + start := s.trackBlockMetadataFrom + if wasKeyFound(uint64(start)) { + return // back-filling not required + } + finish := msgCount - 1 + for start < finish { + mid := (start + finish + 1) / 2 + if wasKeyFound(uint64(mid)) { + finish = mid - 1 + } else { + start = mid + } + } + lastNonExistent := start + + // We back-fill in reverse to avoid fragmentation in case of any failures + batch := s.db.NewBatch() + for i := lastNonExistent; i >= s.trackBlockMetadataFrom; i-- { + if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(i)), nil); err != nil { + log.Error("Error marking blockMetadata as missing while back-filling", "pos", i, "err", err) + return + } + // If we reached the ideal batch size, commit and reset + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Error("Error writing batch with missing trackers to db while back-filling", "err", err) + return + } + batch.Reset() + } + } + if err := batch.Write(); err != nil { + log.Error("Error writing batch with missing trackers to db while back-filling", "err", err) + } +} + func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) + s.LaunchThread(s.backfillTrackersForMissingBlockMetadata) return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) } diff --git a/arbnode/tx_streamer_test.go b/arbnode/tx_streamer_test.go new file mode 100644 index 0000000000..2102f8fbeb --- /dev/null +++ b/arbnode/tx_streamer_test.go @@ -0,0 +1,67 @@ +package arbnode + +import ( + "bytes" + "context" + "encoding/binary" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/rlp" + + "github.com/offchainlabs/nitro/arbutil" +) + +func TestTimeboostBackfillingsTrackersForMissingBlockMetadata(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + messageCount := uint64(20) + + // Create arbDB with fragmented blockMetadata across blocks + arbDb := rawdb.NewMemoryDatabase() + countBytes, err := rlp.EncodeToBytes(messageCount) + Require(t, err) + Require(t, arbDb.Put(messageCountKey, countBytes)) + addKeys := func(start, end uint64, prefix []byte) { + for i := start; i <= end; i++ { + Require(t, arbDb.Put(dbKey(prefix, i), []byte{})) + } + } + // 12, 13, 14, 18 have block metadata + addKeys(12, 14, blockMetadataInputFeedPrefix) + addKeys(18, 18, blockMetadataInputFeedPrefix) + // 15, 16, 17, 19 are missing + addKeys(15, 17, missingBlockMetadataInputFeedPrefix) + addKeys(19, 19, missingBlockMetadataInputFeedPrefix) + + // Create tx streamer + txStreamer := &TransactionStreamer{db: arbDb} + txStreamer.StopWaiter.Start(ctx, txStreamer) + + backfillAndVerifyCorrectness := func(trackBlockMetadataFrom arbutil.MessageIndex, missingTrackers []uint64) { + txStreamer.trackBlockMetadataFrom = trackBlockMetadataFrom + txStreamer.backfillTrackersForMissingBlockMetadata(ctx) + iter := arbDb.NewIterator([]byte("x"), nil) + pos := 0 + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + if binary.BigEndian.Uint64(keyBytes) != missingTrackers[pos] { + t.Fatalf("unexpected presence of blockMetadata. msgSeqNum: %d, expectedMsgSeqNum: %d", binary.BigEndian.Uint64(keyBytes), missingTrackers[pos]) + } + pos++ + } + if pos != len(missingTrackers) { + t.Fatalf("number of keys with blockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", len(missingTrackers), pos) + } + iter.Release() + } + + // Backfill trackers for missing data and verify that 10, 11 get added to already existing 16, 17, 18, 19 keys + backfillAndVerifyCorrectness(10, []uint64{10, 11, 15, 16, 17, 19}) + + // Backfill trackers for missing data and verify that 5, 6, 7, 8, 9 get added to already existing 10, 11, 16, 17, 18, 19 keys + backfillAndVerifyCorrectness(5, []uint64{5, 6, 7, 8, 9, 10, 11, 15, 16, 17, 19}) +}