diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 0dbd9c4d2..a6d6841b2 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -360,6 +360,17 @@ var ( Value: 86400, EnvVars: prefixEnvVars("EL_TRIGGER_GAP"), } + StartupDeferGossipFlag = &cli.BoolFlag{ + Name: "startup.defer-gossip", + Usage: "Defers P2P gossip processing during startup until op-geth's unsafe head has caught up to " + + "the live tip via L1 derivation. This avoids the driver/alt-sync activity loop that occurs when a node " + + "restarts with a large unsafe-head gap. Default enabled for all node types (rpc / bridge / sequencer / p2p); " + + "the catch-up loop returns quickly when no gap exists, so the cost is negligible for nodes that don't need it. " + + "Set to false to opt out and restore the pre-fix startup behavior.", + EnvVars: prefixEnvVars("STARTUP_DEFER_GOSSIP"), + Value: true, + Category: RollupCategory, + } /* Deprecated Flags */ L2EngineSyncEnabled = &cli.BoolFlag{ Name: "l2.engine-sync", @@ -440,6 +451,7 @@ var optionalFlags = []cli.Flag{ SyncModeFlag, FastnodeMode, ELTriggerGap, + StartupDeferGossipFlag, RPCListenAddr, RPCListenPort, L1TrustRPC, diff --git a/op-node/node/config.go b/op-node/node/config.go index 4f67d0f78..b949666ad 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -77,6 +77,16 @@ type Config struct { // Plasma DA config Plasma plasma.CLIConfig + + // StartupDeferGossip toggles the pre-gossip catch-up phase at startup. + // When true (the default), op-node defers enabling gossip until op-geth's unsafe head + // has caught up to the live tip via L1 derivation, preventing the driver/alt-sync + // activity loop that otherwise occurs after restarts with a large unsafe-head gap. + // Enabled by default for all node types (rpc / bridge / sequencer / p2p); the catch-up + // loop returns immediately when op-geth is already at the live tip, so the cost for + // nodes that don't need it (e.g. a healthy sequencer) is negligible. Set to false only + // to deliberately restore the pre-fix startup behavior. + StartupDeferGossip bool } type RPCConfig struct { diff --git a/op-node/node/node.go b/op-node/node/node.go index 822aa7706..2254fdc8e 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -79,11 +79,48 @@ type OpNode struct { closed atomic.Bool + // deferGossipEnabled mirrors cfg.StartupDeferGossip; controls whether Start() runs the pre-gossip catch-up phase. + deferGossipEnabled bool + + // gossipReady gates incoming gossip payloads during the startup catch-up phase. + // While false, gossip payloads received via OnUnsafeL2Payload are silently dropped + // to prevent the clSync queue from accumulating orphan payloads (parent != op-geth.UnsafeL2Head) + // while op-geth's unsafe head is still being advanced via L1 derivation. + // Set to true once catch-up completes (or is disabled / times out). + gossipReady atomic.Bool + + // firstPayloadAllowed lets exactly one gossip payload pass through OnUnsafeL2Payload + // while gossipReady is still false. This is required when running in ELSync mode: + // the engineController initial state is syncStatusWillStartEL, which causes + // IsEngineSyncing() to return true and prevents the driver eventLoop from running + // derivation (the stepReqCh handler short-circuits with `continue`). Until at least + // one payload reaches Driver.OnUnsafeL2Payload -> InsertUnsafePayload, the engine's + // "Skipping EL sync" finalized-block check never fires and syncStatus is stuck. + // Allowing exactly one payload through unblocks that transition and lets derivation + // drive op-geth forward during the catch-up phase. After this single payload, + // subsequent payloads continue to be dropped until gossipReady is flipped to true. + firstPayloadAllowed atomic.Bool + // cancels execution prematurely, e.g. to halt. This may be nil. cancel context.CancelCauseFunc halted atomic.Bool } +const ( + // catchupLagThreshold is how close op-geth's unsafe head timestamp must be + // to the current wall-clock time before gossip is enabled. + catchupLagThreshold = 30 * time.Second + + // catchupMaxWait is the absolute maximum time we are willing to defer gossip. + // If catch-up does not complete within this window (e.g. L1 derivation is unhealthy), + // gossip is enabled regardless and the system degrades to the pre-fix behavior + // rather than blocking forever. + catchupMaxWait = 10 * time.Minute + + // catchupPollInterval is how often we re-check op-geth's unsafe head during catch-up. + catchupPollInterval = 5 * time.Second +) + // The OpNode handles incoming gossip var _ p2p.GossipIn = (*OpNode)(nil) @@ -96,11 +133,22 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge } n := &OpNode{ - log: log, - appVersion: appVersion, - metrics: m, - rollupHalt: cfg.RollupHalt, - cancel: cfg.Cancel, + log: log, + appVersion: appVersion, + metrics: m, + rollupHalt: cfg.RollupHalt, + cancel: cfg.Cancel, + deferGossipEnabled: cfg.StartupDeferGossip, + } + + // Opt-out path: if the operator has explicitly disabled the startup defer-gossip phase + // (--startup.defer-gossip=false), flip the gate to "ready" immediately so that + // OnUnsafeL2Payload processes gossip without delay, matching the pre-fix code path exactly. + // In the default path (deferGossipEnabled=true), gossipReady stays at its zero value + // (false) here and is flipped to true at the end of Start() after waitForOpGethCatchUp + // completes (or times out). + if !n.deferGossipEnabled { + n.gossipReady.Store(true) } // not a context leak, gossipsub is closed with a context. n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background()) @@ -117,6 +165,70 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge return n, nil } +// waitForOpGethCatchUp blocks until op-geth's unsafe head timestamp is within +// catchUpLagThreshold of the current time, or until catchUpMaxWait elapses. +// +// Background: +// On RPC node restart, op-geth's unsafe head is frozen at the pre-restart height for +// the duration of the pod outage. When op-node comes back up and immediately subscribes +// to gossip, incoming gossip payloads have a parent that does not match op-geth's +// stale unsafe head; the clSync queue accumulates orphan payloads. The driver's +// checkForGapInUnsafeQueue then triggers alt-sync via an unbuffered rangeRequests +// channel, while alt-sync's mainLoop -- when promoting results back via receivePayload +// -- itself blocks on the driver's unsafeL2Payloads channel (buf=10). The two goroutines +// form a livelock that only releases through ctx timeouts, leaving the unsafe head +// stalled for some time. +// +// This function defers gossip subscription (via the gossipReady gate) until the L1 +// derivation pipeline has advanced op-geth's unsafe head close enough to the live tip +// that no significant gap exists when gossip is finally enabled, eliminating the +// activity loop's preconditions at the source. +// +// Returns nil on successful catch-up; returns an error on context cancellation or timeout. +// In case of timeout, the caller should still enable gossip and degrade gracefully. +func (n *OpNode) waitForOpGethCatchUp(ctx context.Context) error { + n.log.Info("Starting op-geth catch-up phase before enabling gossip", "lag_threshold", catchupLagThreshold, + "max_wait", catchupMaxWait) + + deadline := time.Now().Add(catchupMaxWait) + ticker := time.NewTicker(catchupPollInterval) + defer ticker.Stop() + + for { + // Query op-geth's current unsafe head. + queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + unsafeHead, err := n.l2Source.L2BlockRefByLabel(queryCtx, eth.Unsafe) + cancel() + + if err != nil { + n.log.Warn("Failed to query op-geth unsafe head during catch-up, will retry", "error", err) + } else { + headTime := time.Unix(int64(unsafeHead.Time), 0) + lag := time.Since(headTime) + + // Treat negative lag (clock skew or future-timestamp head) as caught up. + if lag < catchupLagThreshold { + n.log.Info("op-geth caught up; enabling gossip", "unsafe_head", unsafeHead.Number, "lag", lag) + return nil + } + + n.log.Info("op-geth still catching up via L1 derivation", "unsafe_head", unsafeHead.Number, + "lag", lag, "deadline_in", time.Until(deadline)) + } + + if time.Now().After(deadline) { + return fmt.Errorf("startup catch-up timeout after %v", catchupMaxWait) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + continue + } + } +} + func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error { n.log.Info("Initializing rollup node", "version", n.appVersion) if err := n.initTracer(ctx, cfg); err != nil { @@ -484,6 +596,17 @@ func (n *OpNode) Start(ctx context.Context) error { n.log.Error("Could not start a rollup node", "err", err) return err } + + if n.deferGossipEnabled { + if err := n.waitForOpGethCatchUp(ctx); err != nil { + // Catch-up failed (e.g. timeout, L1 derivation unhealthy). Enable gossip anyway + // to avoid blocking the node forever; the system degrades to the pre-fix behavior. + n.log.Warn("Startup catch-up did not complete cleanly; enabling gossip anyway", "err", err) + } + n.gossipReady.Store(true) + n.log.Info("Gossip enabled; op-node fully active") + } + log.Info("Rollup node started") return nil } @@ -543,11 +666,31 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa } func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error { - // ignore if it's from ourselves + // ignore if it's from ourselves. + // Note: this check intentionally runs BEFORE the gossipReady gate so that self-published + // payloads (sequencer publishing its own blocks via gossipsub, which loops back to the + // local subscriber) do not consume the firstPayloadAllowed slot. Without this ordering, + // a sequencer running in ELSync mode could waste its only "free" payload on a self-loop, + // leaving syncStatus stuck at WillStartEL. if n.p2pNode != nil && from == n.p2pNode.Host().ID() { return nil } + // Drop external gossip payloads received during the startup catch-up phase, except + // for the very first payload which is needed to trigger the ELSync syncStatus + // transition inside InsertUnsafePayload (see firstPayloadAllowed comment on OpNode). + if !n.gossipReady.Load() { + if !n.firstPayloadAllowed.Swap(true) { + n.log.Info("Allowing first gossip payload through during catch-up to unblock engine sync state", + "peer", from, "id", envelope.ExecutionPayload.ID()) + // fall through to the regular processing path below + } else { + n.log.Debug("Dropping gossip payload during startup catch-up phase", "peer", from, + "id", envelope.ExecutionPayload.ID()) + return nil + } + } + n.tracer.OnUnsafeL2Payload(ctx, from, envelope) n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from) diff --git a/op-node/service.go b/op-node/service.go index e162834c5..53f29429e 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -115,6 +115,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name), Plasma: plasma.ReadCLIConfig(ctx), + + StartupDeferGossip: ctx.Bool(flags.StartupDeferGossipFlag.Name), } if err := cfg.LoadPersisted(log); err != nil {