Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,15 @@ var (
Value: 86400,
EnvVars: prefixEnvVars("EL_TRIGGER_GAP"),
}
StartupDeferGossipFlag = &cli.BoolFlag{
Name: "startup.defer-gossip",
Usage: "Defers P2P gossip processing during startup until op-geth's unsafe head has caught up to" +
"the live tip via L1 derivation. This avoids the driver/alt-sync activity loop that occurs when an RPC node" +
"restarts with a large unsafe-head gap. Recommended for RPC and bridge nodes; not needed for sequencer and P2P nodes.",
EnvVars: prefixEnvVars("STARTUP_DEFER_GOSSIP"),
Value: false,
Comment thread
sysvm marked this conversation as resolved.
Outdated
Category: RollupCategory,
}
/* Deprecated Flags */
L2EngineSyncEnabled = &cli.BoolFlag{
Name: "l2.engine-sync",
Expand Down Expand Up @@ -440,6 +449,7 @@ var optionalFlags = []cli.Flag{
SyncModeFlag,
FastnodeMode,
ELTriggerGap,
StartupDeferGossipFlag,
RPCListenAddr,
RPCListenPort,
L1TrustRPC,
Expand Down
7 changes: 7 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type Config struct {

// Plasma DA config
Plasma plasma.CLIConfig

// StartupDeferGossip toggles the optional pre-gossip catch-up phase at startup.
// When true, op-node defers enabling gossip until op-geth's unsafe head has caught up
// to the live tip via L1 derivation, preventing the driver/alt-sync activity loop
// that otherwise occurs after restarts with a large unsafe-head gap.
// Recommended for RPC and bridge nodes; sequencer and P2P nodes should leave it disabled.
StartupDeferGossip bool
}

type RPCConfig struct {
Expand Down
144 changes: 139 additions & 5 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,48 @@ type OpNode struct {

closed atomic.Bool

// deferGossipEnabled mirrors cfg.StartupDeferGossip; controls whether Start() runs the pre-gossip catch-up phase.
deferGossipEnabled bool

// gossipReady gates incoming gossip payloads during the startup catch-up phase.
// While false, gossip payloads received via OnUnsafeL2Payload are silently dropped
// to prevent the clSync queue from accumulating orphan payloads (parent != op-geth.UnsafeL2Head)
// while op-geth's unsafe head is still being advanced via L1 derivation.
// Set to true once catch-up completes (or is disabled / times out).
gossipReady atomic.Bool

// firstPayloadAllowed lets exactly one gossip payload pass through OnUnsafeL2Payload
// while gossipReady is still false. This is required when running in ELSync mode:
// the engineController initial state is syncStatusWillStartEL, which causes
// IsEngineSyncing() to return true and prevents the driver eventLoop from running
// derivation (the stepReqCh handler short-circuits with `continue`). Until at least
// one payload reaches Driver.OnUnsafeL2Payload -> InsertUnsafePayload, the engine's
// "Skipping EL sync" finalized-block check never fires and syncStatus is stuck.
// Allowing exactly one payload through unblocks that transition and lets derivation
// drive op-geth forward during the catch-up phase. After this single payload,
// subsequent payloads continue to be dropped until gossipReady is flipped to true.
firstPayloadAllowed atomic.Bool

// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted atomic.Bool
}

const (
// catchupLagThreshold is how close op-geth's unsafe head timestamp must be
// to the current wall-clock time before gossip is enabled.
catchupLagThreshold = 30 * time.Second

// catchupMaxWait is the absolute maximum time we are willing to defer gossip.
// If catch-up does not complete within this window (e.g. L1 derivation is unhealthy),
// gossip is enabled regardless and the system degrades to the pre-fix behavior
// rather than blocking forever.
catchupMaxWait = 10 * time.Minute

// catchupPollInterval is how often we re-check op-geth's unsafe head during catch-up.
catchupPollInterval = 5 * time.Second
)

// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)

Expand All @@ -96,11 +133,18 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
}

n := &OpNode{
log: log,
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
log: log,
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
deferGossipEnabled: cfg.StartupDeferGossip,
}

// If defer gossip is disabled, gossip should be processed immediately as before.
// Set the gate to "ready" up front so OnUnsafeL2Payload behaves identically to the pre-fix code path.
if !n.deferGossipEnabled {
n.gossipReady.Store(true)
}
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
Expand All @@ -117,6 +161,70 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
return n, nil
}

// waitForOpGethCatchUp blocks until op-geth's unsafe head timestamp is within
// catchUpLagThreshold of the current time, or until catchUpMaxWait elapses.
//
// Background:
// On RPC node restart, op-geth's unsafe head is frozen at the pre-restart height for
// the duration of the pod outage. When op-node comes back up and immediately subscribes
// to gossip, incoming gossip payloads have a parent that does not match op-geth's
// stale unsafe head; the clSync queue accumulates orphan payloads. The driver's
// checkForGapInUnsafeQueue then triggers alt-sync via an unbuffered rangeRequests
// channel, while alt-sync's mainLoop -- when promoting results back via receivePayload
// -- itself blocks on the driver's unsafeL2Payloads channel (buf=10). The two goroutines
// form a livelock that only releases through ctx timeouts, leaving the unsafe head
// stalled for some time.
//
// This function defers gossip subscription (via the gossipReady gate) until the L1
// derivation pipeline has advanced op-geth's unsafe head close enough to the live tip
// that no significant gap exists when gossip is finally enabled, eliminating the
// activity loop's preconditions at the source.
//
// Returns nil on successful catch-up; returns an error on context cancellation or timeout.
// In case of timeout, the caller should still enable gossip and degrade gracefully.
func (n *OpNode) waitForOpGethCatchUp(ctx context.Context) error {
n.log.Info("Starting op-geth catch-up phase before enabling gossip", "lag_threshold", catchupLagThreshold,
"max_wait", catchupMaxWait)

deadline := time.Now().Add(catchupMaxWait)
ticker := time.NewTicker(catchupPollInterval)
defer ticker.Stop()

for {
// Query op-geth's current unsafe head.
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
unsafeHead, err := n.l2Source.L2BlockRefByLabel(queryCtx, eth.Unsafe)
cancel()

if err != nil {
n.log.Warn("Failed to query op-geth unsafe head during catch-up, will retry", "error", err)
} else {
headTime := time.Unix(int64(unsafeHead.Time), 0)
lag := time.Since(headTime)

// Treat negative lag (clock skew or future-timestamp head) as caught up.
if lag < catchupLagThreshold {
n.log.Info("op-geth caught up; enabling gossip", "unsafe_head", unsafeHead.Number, "lag", lag)
return nil
}

n.log.Info("op-geth still catching up via L1 derivation", "unsafe_head", unsafeHead.Number,
"lag", lag, "deadline_in", time.Until(deadline))
}

if time.Now().After(deadline) {
return fmt.Errorf("startup catch-up timeout after %v", catchupMaxWait)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
continue
}
}
}

func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
n.log.Info("Initializing rollup node", "version", n.appVersion)
if err := n.initTracer(ctx, cfg); err != nil {
Expand Down Expand Up @@ -484,6 +592,19 @@ func (n *OpNode) Start(ctx context.Context) error {
n.log.Error("Could not start a rollup node", "err", err)
return err
}

if n.deferGossipEnabled {
if err := n.waitForOpGethCatchUp(ctx); err != nil {
// Catch-up failed (e.g. timeout, L1 derivation unhealthy). Enable gossip anyway
// to avoid blocking the node forever; the system degrades to the pre-fix behavior.
n.log.Warn("Startup catch-up did not complete cleanly; enabling gossip anyway", "err", err)
}
n.gossipReady.Store(true)
n.log.Info("Gossip enabled; op-node fully active")
}
// If defer gossip is disabled, gossipReady was already set to true in New(),
// so OnUnsafeL2Payload behaves identically to the pre-fix code path.

log.Info("Rollup node started")
return nil
}
Expand Down Expand Up @@ -543,6 +664,19 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
}

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// If defer gossip is enabled, drop gossip payloads received during the startup catch-up phase.
if !n.gossipReady.Load() {
if !n.firstPayloadAllowed.Swap(true) {
Comment thread
sysvm marked this conversation as resolved.
n.log.Info("Allowing first gossip payload through during catch-up to unblock engine sync state",
"peer", from, "id", envelope.ExecutionPayload.ID())
// fall through to the regular processing path below
} else {
n.log.Debug("Dropping gossip payload during startup catch-up phase", "peer", from,
"id", envelope.ExecutionPayload.ID())
return nil
}
}

// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
return nil
Expand Down
2 changes: 2 additions & 0 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),

Plasma: plasma.ReadCLIConfig(ctx),

StartupDeferGossip: ctx.Bool(flags.StartupDeferGossipFlag.Name),
}

if err := cfg.LoadPersisted(log); err != nil {
Expand Down
Loading