Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,17 @@ var (
Value: 86400,
EnvVars: prefixEnvVars("EL_TRIGGER_GAP"),
}
StartupDeferGossipFlag = &cli.BoolFlag{
Name: "startup.defer-gossip",
Usage: "Defers P2P gossip processing during startup until op-geth's unsafe head has caught up to " +
"the live tip via L1 derivation. This avoids the driver/alt-sync activity loop that occurs when a node " +
"restarts with a large unsafe-head gap. Default enabled for all node types (rpc / bridge / sequencer / p2p); " +
"the catch-up loop returns quickly when no gap exists, so the cost is negligible for nodes that don't need it. " +
"Set to false to opt out and restore the pre-fix startup behavior.",
EnvVars: prefixEnvVars("STARTUP_DEFER_GOSSIP"),
Value: true,
Category: RollupCategory,
}
/* Deprecated Flags */
L2EngineSyncEnabled = &cli.BoolFlag{
Name: "l2.engine-sync",
Expand Down Expand Up @@ -440,6 +451,7 @@ var optionalFlags = []cli.Flag{
SyncModeFlag,
FastnodeMode,
ELTriggerGap,
StartupDeferGossipFlag,
RPCListenAddr,
RPCListenPort,
L1TrustRPC,
Expand Down
10 changes: 10 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ type Config struct {

// Plasma DA config
Plasma plasma.CLIConfig

// StartupDeferGossip toggles the pre-gossip catch-up phase at startup.
// When true (the default), op-node defers enabling gossip until op-geth's unsafe head
// has caught up to the live tip via L1 derivation, preventing the driver/alt-sync
// activity loop that otherwise occurs after restarts with a large unsafe-head gap.
// Enabled by default for all node types (rpc / bridge / sequencer / p2p); the catch-up
// loop returns immediately when op-geth is already at the live tip, so the cost for
// nodes that don't need it (e.g. a healthy sequencer) is negligible. Set to false only
// to deliberately restore the pre-fix startup behavior.
StartupDeferGossip bool
}

type RPCConfig struct {
Expand Down
155 changes: 149 additions & 6 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,48 @@ type OpNode struct {

closed atomic.Bool

// deferGossipEnabled mirrors cfg.StartupDeferGossip; controls whether Start() runs the pre-gossip catch-up phase.
deferGossipEnabled bool

// gossipReady gates incoming gossip payloads during the startup catch-up phase.
// While false, gossip payloads received via OnUnsafeL2Payload are silently dropped
// to prevent the clSync queue from accumulating orphan payloads (parent != op-geth.UnsafeL2Head)
// while op-geth's unsafe head is still being advanced via L1 derivation.
// Set to true once catch-up completes (or is disabled / times out).
gossipReady atomic.Bool

// firstPayloadAllowed lets exactly one gossip payload pass through OnUnsafeL2Payload
// while gossipReady is still false. This is required when running in ELSync mode:
// the engineController initial state is syncStatusWillStartEL, which causes
// IsEngineSyncing() to return true and prevents the driver eventLoop from running
// derivation (the stepReqCh handler short-circuits with `continue`). Until at least
// one payload reaches Driver.OnUnsafeL2Payload -> InsertUnsafePayload, the engine's
// "Skipping EL sync" finalized-block check never fires and syncStatus is stuck.
// Allowing exactly one payload through unblocks that transition and lets derivation
// drive op-geth forward during the catch-up phase. After this single payload,
// subsequent payloads continue to be dropped until gossipReady is flipped to true.
firstPayloadAllowed atomic.Bool

// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted atomic.Bool
}

const (
// catchupLagThreshold is how close op-geth's unsafe head timestamp must be
// to the current wall-clock time before gossip is enabled.
catchupLagThreshold = 30 * time.Second

// catchupMaxWait is the absolute maximum time we are willing to defer gossip.
// If catch-up does not complete within this window (e.g. L1 derivation is unhealthy),
// gossip is enabled regardless and the system degrades to the pre-fix behavior
// rather than blocking forever.
catchupMaxWait = 10 * time.Minute

// catchupPollInterval is how often we re-check op-geth's unsafe head during catch-up.
catchupPollInterval = 5 * time.Second
)

// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)

Expand All @@ -96,11 +133,22 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
}

n := &OpNode{
log: log,
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
log: log,
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
deferGossipEnabled: cfg.StartupDeferGossip,
}

// Opt-out path: if the operator has explicitly disabled the startup defer-gossip phase
// (--startup.defer-gossip=false), flip the gate to "ready" immediately so that
// OnUnsafeL2Payload processes gossip without delay, matching the pre-fix code path exactly.
// In the default path (deferGossipEnabled=true), gossipReady stays at its zero value
// (false) here and is flipped to true at the end of Start() after waitForOpGethCatchUp
// completes (or times out).
if !n.deferGossipEnabled {
n.gossipReady.Store(true)
}
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
Expand All @@ -117,6 +165,70 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
return n, nil
}

// waitForOpGethCatchUp blocks until op-geth's unsafe head timestamp is within
// catchUpLagThreshold of the current time, or until catchUpMaxWait elapses.
//
// Background:
// On RPC node restart, op-geth's unsafe head is frozen at the pre-restart height for
// the duration of the pod outage. When op-node comes back up and immediately subscribes
// to gossip, incoming gossip payloads have a parent that does not match op-geth's
// stale unsafe head; the clSync queue accumulates orphan payloads. The driver's
// checkForGapInUnsafeQueue then triggers alt-sync via an unbuffered rangeRequests
// channel, while alt-sync's mainLoop -- when promoting results back via receivePayload
// -- itself blocks on the driver's unsafeL2Payloads channel (buf=10). The two goroutines
// form a livelock that only releases through ctx timeouts, leaving the unsafe head
// stalled for some time.
//
// This function defers gossip subscription (via the gossipReady gate) until the L1
// derivation pipeline has advanced op-geth's unsafe head close enough to the live tip
// that no significant gap exists when gossip is finally enabled, eliminating the
// activity loop's preconditions at the source.
//
// Returns nil on successful catch-up; returns an error on context cancellation or timeout.
// In case of timeout, the caller should still enable gossip and degrade gracefully.
func (n *OpNode) waitForOpGethCatchUp(ctx context.Context) error {
n.log.Info("Starting op-geth catch-up phase before enabling gossip", "lag_threshold", catchupLagThreshold,
"max_wait", catchupMaxWait)

deadline := time.Now().Add(catchupMaxWait)
ticker := time.NewTicker(catchupPollInterval)
defer ticker.Stop()

for {
// Query op-geth's current unsafe head.
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
unsafeHead, err := n.l2Source.L2BlockRefByLabel(queryCtx, eth.Unsafe)
cancel()

if err != nil {
n.log.Warn("Failed to query op-geth unsafe head during catch-up, will retry", "error", err)
} else {
headTime := time.Unix(int64(unsafeHead.Time), 0)
lag := time.Since(headTime)

// Treat negative lag (clock skew or future-timestamp head) as caught up.
if lag < catchupLagThreshold {
n.log.Info("op-geth caught up; enabling gossip", "unsafe_head", unsafeHead.Number, "lag", lag)
return nil
}

n.log.Info("op-geth still catching up via L1 derivation", "unsafe_head", unsafeHead.Number,
"lag", lag, "deadline_in", time.Until(deadline))
}

if time.Now().After(deadline) {
return fmt.Errorf("startup catch-up timeout after %v", catchupMaxWait)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
continue
}
}
}

func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
n.log.Info("Initializing rollup node", "version", n.appVersion)
if err := n.initTracer(ctx, cfg); err != nil {
Expand Down Expand Up @@ -484,6 +596,17 @@ func (n *OpNode) Start(ctx context.Context) error {
n.log.Error("Could not start a rollup node", "err", err)
return err
}

if n.deferGossipEnabled {
if err := n.waitForOpGethCatchUp(ctx); err != nil {
// Catch-up failed (e.g. timeout, L1 derivation unhealthy). Enable gossip anyway
// to avoid blocking the node forever; the system degrades to the pre-fix behavior.
n.log.Warn("Startup catch-up did not complete cleanly; enabling gossip anyway", "err", err)
}
n.gossipReady.Store(true)
n.log.Info("Gossip enabled; op-node fully active")
}

log.Info("Rollup node started")
return nil
}
Expand Down Expand Up @@ -543,11 +666,31 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
}

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
// ignore if it's from ourselves.
// Note: this check intentionally runs BEFORE the gossipReady gate so that self-published
// payloads (sequencer publishing its own blocks via gossipsub, which loops back to the
// local subscriber) do not consume the firstPayloadAllowed slot. Without this ordering,
// a sequencer running in ELSync mode could waste its only "free" payload on a self-loop,
// leaving syncStatus stuck at WillStartEL.
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
return nil
}

// Drop external gossip payloads received during the startup catch-up phase, except
// for the very first payload which is needed to trigger the ELSync syncStatus
// transition inside InsertUnsafePayload (see firstPayloadAllowed comment on OpNode).
if !n.gossipReady.Load() {
if !n.firstPayloadAllowed.Swap(true) {
Comment thread
sysvm marked this conversation as resolved.
n.log.Info("Allowing first gossip payload through during catch-up to unblock engine sync state",
"peer", from, "id", envelope.ExecutionPayload.ID())
// fall through to the regular processing path below
} else {
n.log.Debug("Dropping gossip payload during startup catch-up phase", "peer", from,
"id", envelope.ExecutionPayload.ID())
return nil
}
}

n.tracer.OnUnsafeL2Payload(ctx, from, envelope)

n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from)
Expand Down
2 changes: 2 additions & 0 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),

Plasma: plasma.ReadCLIConfig(ctx),

StartupDeferGossip: ctx.Bool(flags.StartupDeferGossipFlag.Name),
}

if err := cfg.LoadPersisted(log); err != nil {
Expand Down
Loading