Skip to content

Commit c52a971

Browse files
joshuacolvin0claude
andcommitted
fix: flaky pathdb test failures caused by MEL integration
Fixes flaky test failures caused by the Message Extraction Layer (MEL) integration by introducing a BatchDataReader interface that abstracts over MEL vs InboxTracker, and adding proper delayed message rollback on accumulator mismatch in addMessages. Key changes: - Add BatchDataReader interface and BatchDataSource() to eliminate duplicated MEL-vs-InboxTracker dispatch across system tests - Roll back delayed messages when AddSequencerBatches detects a mismatch, preventing orphaned DB entries - Wrap both rollback and original mismatch errors in the error chain - Replace unbounded polling loops in tests with deadline-guarded waits - Fix pre-existing diagnostic bug in meaningless_reorg_test.go Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0973192 commit c52a971

17 files changed

+618
-247
lines changed

arbnode/delayed_seq_reorg_test.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@ package arbnode
66
import (
77
"context"
88
"encoding/binary"
9+
"errors"
10+
"strings"
11+
"sync/atomic"
912
"testing"
1013

1114
"github.com/ethereum/go-ethereum/common"
1215
"github.com/ethereum/go-ethereum/core/types"
16+
"github.com/ethereum/go-ethereum/ethdb"
1317

1418
"github.com/offchainlabs/nitro/arbnode/mel"
1519
"github.com/offchainlabs/nitro/arbos/arbostypes"
@@ -446,3 +450,268 @@ func TestSequencerReorgFromLastDelayedMsg(t *testing.T) {
446450
Fail(t, "Unexpected tracker batch count", batchCount, "(expected 2)")
447451
}
448452
}
453+
454+
// mismatchTestFixture holds the shared state for delayed-mismatch tests.
455+
type mismatchTestFixture struct {
456+
ctx context.Context
457+
tracker *InboxTracker
458+
initDelayed *mel.DelayedInboxMessage
459+
userDelayed *mel.DelayedInboxMessage
460+
mismatchBatch *mel.SequencerInboxBatch
461+
}
462+
463+
// newMismatchTestFixture creates a tracker with one init delayed message
464+
// committed to the DB (delayed count = 1) and prepares a second delayed
465+
// message and a batch whose AfterDelayedAcc is intentionally wrong.
466+
func newMismatchTestFixture(t *testing.T, ctx context.Context) *mismatchTestFixture {
467+
t.Helper()
468+
exec, streamer, db, _ := NewTransactionStreamerForTest(t, ctx, common.Address{})
469+
tracker, err := NewInboxTracker(db, streamer, nil)
470+
Require(t, err)
471+
472+
err = streamer.Start(ctx)
473+
Require(t, err)
474+
err = exec.Start(ctx)
475+
Require(t, err)
476+
init, err := streamer.GetMessage(0)
477+
Require(t, err)
478+
479+
initDelayed := &mel.DelayedInboxMessage{
480+
BlockHash: [32]byte{},
481+
BeforeInboxAcc: [32]byte{},
482+
Message: init.Message,
483+
}
484+
delayedRequestId := common.BigToHash(common.Big1)
485+
userDelayed := &mel.DelayedInboxMessage{
486+
BlockHash: [32]byte{},
487+
BeforeInboxAcc: initDelayed.AfterInboxAcc(),
488+
Message: &arbostypes.L1IncomingMessage{
489+
Header: &arbostypes.L1IncomingMessageHeader{
490+
Kind: arbostypes.L1MessageType_EndOfBlock,
491+
Poster: [20]byte{},
492+
BlockNumber: 0,
493+
Timestamp: 0,
494+
RequestId: &delayedRequestId,
495+
L1BaseFee: common.Big0,
496+
},
497+
},
498+
}
499+
500+
err = tracker.AddDelayedMessages([]*mel.DelayedInboxMessage{initDelayed})
501+
Require(t, err)
502+
503+
serializedBatch := make([]byte, 40)
504+
binary.BigEndian.PutUint64(serializedBatch[32:], 1)
505+
mismatchBatch := &mel.SequencerInboxBatch{
506+
BlockHash: [32]byte{},
507+
ParentChainBlockNumber: 0,
508+
SequenceNumber: 0,
509+
BeforeInboxAcc: [32]byte{},
510+
AfterInboxAcc: [32]byte{1},
511+
AfterDelayedAcc: common.Hash{0xff}, // wrong accumulator
512+
AfterDelayedCount: 2,
513+
TimeBounds: bridgegen.IBridgeTimeBounds{},
514+
RawLog: types.Log{},
515+
DataLocation: 0,
516+
BridgeAddress: [20]byte{},
517+
Serialized: serializedBatch,
518+
}
519+
520+
return &mismatchTestFixture{
521+
ctx: ctx,
522+
tracker: tracker,
523+
initDelayed: initDelayed,
524+
userDelayed: userDelayed,
525+
mismatchBatch: mismatchBatch,
526+
}
527+
}
528+
529+
// TestDelayedMismatchRollsBackDelayedMessages verifies that addMessages rolls
530+
// back delayed messages when AddSequencerBatches fails with a delayed
531+
// accumulator mismatch. Without the rollback, delayed messages would be
532+
// committed to the DB without corresponding batches.
533+
func TestDelayedMismatchRollsBackDelayedMessages(t *testing.T) {
534+
ctx, cancel := context.WithCancel(context.Background())
535+
defer cancel()
536+
f := newMismatchTestFixture(t, ctx)
537+
538+
// addMessages should roll back delayed messages on mismatch
539+
reader := &InboxReader{tracker: f.tracker}
540+
delayedMismatch, err := reader.addMessages(
541+
ctx,
542+
[]*mel.SequencerInboxBatch{f.mismatchBatch},
543+
[]*mel.DelayedInboxMessage{f.userDelayed},
544+
)
545+
Require(t, err)
546+
if !delayedMismatch {
547+
Fail(t, "Expected delayedMismatch to be true")
548+
}
549+
550+
// Delayed count should be rolled back to 1 (the init message only).
551+
// Before the fix, this would be 2 — an orphaned delayed message.
552+
delayedCount, err := f.tracker.GetDelayedCount()
553+
Require(t, err)
554+
if delayedCount != 1 {
555+
Fail(t, "Delayed count not rolled back after mismatch", delayedCount, "(expected 1)")
556+
}
557+
}
558+
559+
// TestDelayedMismatchNoOpRollback verifies that addMessages handles a mismatch
560+
// correctly even when no new delayed messages were provided. The rollback
561+
// should be a no-op (rolling back to the current count) without errors.
562+
func TestDelayedMismatchNoOpRollback(t *testing.T) {
563+
ctx, cancel := context.WithCancel(context.Background())
564+
defer cancel()
565+
f := newMismatchTestFixture(t, ctx)
566+
567+
reader := &InboxReader{tracker: f.tracker}
568+
delayedMismatch, err := reader.addMessages(
569+
ctx,
570+
[]*mel.SequencerInboxBatch{f.mismatchBatch},
571+
nil, // no new delayed messages
572+
)
573+
Require(t, err)
574+
if !delayedMismatch {
575+
Fail(t, "Expected delayedMismatch to be true")
576+
}
577+
578+
// Count should remain 1 (init message only, no rollback needed).
579+
delayedCount, err := f.tracker.GetDelayedCount()
580+
Require(t, err)
581+
if delayedCount != 1 {
582+
Fail(t, "Delayed count changed unexpectedly", delayedCount, "(expected 1)")
583+
}
584+
}
585+
586+
// TestDelayedMismatchAtTrackerLevel verifies that calling AddDelayedMessages
587+
// then AddSequencerBatches with a mismatched accumulator returns
588+
// delayedMessagesMismatch and leaves delayed messages in the DB. This
589+
// documents the low-level behavior that addMessages must compensate for.
590+
func TestDelayedMismatchAtTrackerLevel(t *testing.T) {
591+
ctx, cancel := context.WithCancel(context.Background())
592+
defer cancel()
593+
f := newMismatchTestFixture(t, ctx)
594+
595+
// Add the second delayed message — now count = 2
596+
err := f.tracker.AddDelayedMessages([]*mel.DelayedInboxMessage{f.userDelayed})
597+
Require(t, err)
598+
599+
delayedCount, err := f.tracker.GetDelayedCount()
600+
Require(t, err)
601+
if delayedCount != 2 {
602+
Fail(t, "Unexpected delayed count", delayedCount, "(expected 2)")
603+
}
604+
605+
// AddSequencerBatches should return delayedMessagesMismatch
606+
err = f.tracker.AddSequencerBatches(ctx, nil, []*mel.SequencerInboxBatch{f.mismatchBatch})
607+
if !errors.Is(err, delayedMessagesMismatch) {
608+
Fail(t, "Expected delayedMessagesMismatch error, got", err)
609+
}
610+
611+
// Delayed messages are still in the DB (AddSequencerBatches does not roll them back)
612+
delayedCount, err = f.tracker.GetDelayedCount()
613+
Require(t, err)
614+
if delayedCount != 2 {
615+
Fail(t, "Delayed messages should still be in DB", delayedCount, "(expected 2)")
616+
}
617+
618+
// ReorgDelayedTo cleans up the orphaned messages
619+
err = f.tracker.ReorgDelayedTo(1)
620+
Require(t, err)
621+
622+
delayedCount, err = f.tracker.GetDelayedCount()
623+
Require(t, err)
624+
if delayedCount != 1 {
625+
Fail(t, "ReorgDelayedTo did not clean up orphaned messages", delayedCount, "(expected 1)")
626+
}
627+
}
628+
629+
// TestAddMessages_GetDelayedCountError verifies that addMessages returns a
630+
// wrapped error when the initial GetDelayedCount call fails (e.g. closed DB).
631+
func TestAddMessages_GetDelayedCountError(t *testing.T) {
632+
ctx, cancel := context.WithCancel(context.Background())
633+
defer cancel()
634+
f := newMismatchTestFixture(t, ctx)
635+
636+
// Close the underlying DB so that GetDelayedCount fails.
637+
f.tracker.db.Close()
638+
639+
reader := &InboxReader{tracker: f.tracker}
640+
_, err := reader.addMessages(ctx, nil, nil)
641+
if err == nil {
642+
Fail(t, "Expected error from addMessages when GetDelayedCount fails")
643+
}
644+
if !strings.Contains(err.Error(), "getting delayed message count before adding messages") {
645+
Fail(t, "Expected wrapped error, got:", err)
646+
}
647+
}
648+
649+
// TestAddMessages_ReorgDelayedToError verifies that when addMessages detects a
650+
// delayed accumulator mismatch and the subsequent ReorgDelayedTo fails, the
651+
// returned error wraps the rollback error and includes the original mismatch.
652+
func TestAddMessages_ReorgDelayedToError(t *testing.T) {
653+
ctx, cancel := context.WithCancel(context.Background())
654+
defer cancel()
655+
f := newMismatchTestFixture(t, ctx)
656+
657+
// Wrap the DB so that the second batch.Write (ReorgDelayedTo) fails.
658+
// First batch.Write (AddDelayedMessages) succeeds normally.
659+
injectedErr := errors.New("injected write failure")
660+
f.tracker.db = &failingBatchDB{
661+
Database: f.tracker.db,
662+
writesBeforeFail: 1, // allow 1 successful Write, then fail
663+
writeErr: injectedErr,
664+
}
665+
666+
reader := &InboxReader{tracker: f.tracker}
667+
_, err := reader.addMessages(
668+
ctx,
669+
[]*mel.SequencerInboxBatch{f.mismatchBatch},
670+
[]*mel.DelayedInboxMessage{f.userDelayed},
671+
)
672+
if err == nil {
673+
Fail(t, "Expected error when ReorgDelayedTo fails during rollback")
674+
}
675+
if !errors.Is(err, injectedErr) {
676+
Fail(t, "Returned error should wrap the rollback error, got:", err)
677+
}
678+
if !strings.Contains(err.Error(), "failed to rollback delayed messages") {
679+
Fail(t, "Returned error should describe rollback failure, got:", err)
680+
}
681+
if !strings.Contains(err.Error(), "original mismatch") {
682+
Fail(t, "Returned error should include original mismatch error, got:", err)
683+
}
684+
if !errors.Is(err, delayedMessagesMismatch) {
685+
Fail(t, "Returned error should wrap the original mismatch error, got:", err)
686+
}
687+
}
688+
689+
// failingBatchDB wraps an ethdb.Database and makes batch Write() calls fail
690+
// after a configurable number of successful writes.
691+
type failingBatchDB struct {
692+
ethdb.Database
693+
writesBeforeFail int
694+
writeErr error
695+
writeCount atomic.Int32
696+
}
697+
698+
func (f *failingBatchDB) NewBatch() ethdb.Batch {
699+
return &failingBatch{Batch: f.Database.NewBatch(), parent: f}
700+
}
701+
702+
func (f *failingBatchDB) NewBatchWithSize(size int) ethdb.Batch {
703+
return &failingBatch{Batch: f.Database.NewBatchWithSize(size), parent: f}
704+
}
705+
706+
type failingBatch struct {
707+
ethdb.Batch
708+
parent *failingBatchDB
709+
}
710+
711+
func (b *failingBatch) Write() error {
712+
n := int(b.parent.writeCount.Add(1))
713+
if n > b.parent.writesBeforeFail {
714+
return b.parent.writeErr
715+
}
716+
return b.Batch.Write()
717+
}

arbnode/inbox_reader.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -594,8 +594,10 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
594594
}
595595
if delayedMismatch {
596596
reorgingDelayed = true
597-
}
598-
if len(sequencerBatches) > 0 {
597+
log.Debug("Skipping batch count update due to delayed message mismatch", "numBatches", len(sequencerBatches))
598+
// else-if: when a mismatch occurs the batches were not committed,
599+
// so we must not advance lastReadBatchCount.
600+
} else if len(sequencerBatches) > 0 {
599601
readAnyBatches = true
600602
r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1)
601603
storeSeenBatchCount()
@@ -632,15 +634,30 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
632634
}
633635

634636
func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*mel.SequencerInboxBatch, delayedMessages []*mel.DelayedInboxMessage) (bool, error) {
635-
err := r.tracker.AddDelayedMessages(delayedMessages)
637+
prevDelayedCount, err := r.tracker.GetDelayedCount()
638+
if err != nil {
639+
return false, fmt.Errorf("getting delayed message count before adding messages: %w", err)
640+
}
641+
err = r.tracker.AddDelayedMessages(delayedMessages)
636642
if err != nil {
637-
return false, err
643+
return false, fmt.Errorf("adding delayed messages: %w", err)
638644
}
639645
err = r.tracker.AddSequencerBatches(ctx, r.client, sequencerBatches)
640646
if errors.Is(err, delayedMessagesMismatch) {
647+
// Roll back the delayed messages that were just committed so they don't
648+
// remain as orphans in the DB without corresponding batches.
649+
if rollbackErr := r.tracker.ReorgDelayedTo(prevDelayedCount); rollbackErr != nil {
650+
return false, fmt.Errorf("failed to rollback delayed messages after sequencer batch mismatch (prevDelayedCount=%d): rollback: %w; original mismatch: %w", prevDelayedCount, rollbackErr, err)
651+
}
652+
log.Info("Rolled back delayed messages after sequencer batch mismatch",
653+
"prevDelayedCount", prevDelayedCount,
654+
"attemptedDelayedCount", prevDelayedCount+uint64(len(delayedMessages)),
655+
"numBatches", len(sequencerBatches),
656+
"mismatchErr", err,
657+
)
641658
return true, nil
642659
} else if err != nil {
643-
return false, err
660+
return false, fmt.Errorf("adding sequencer batches: %w", err)
644661
}
645662
return false, nil
646663
}

0 commit comments

Comments
 (0)