From 5f15191b2ba83014915131c0cf6a31d994e7bfc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 11:13:58 +0300 Subject: [PATCH 1/8] WIP: Implement listen mode to producer sequencer rpc toggle --- bin/citrea/src/main.rs | 11 +- bin/citrea/src/rollup/mod.rs | 27 +-- crates/common/src/lib.rs | 34 ++++ crates/sequencer/src/lib.rs | 48 ++++- crates/sequencer/src/listen_mode/mod.rs | 247 +++++++++++++++++++++--- crates/sequencer/src/rpc.rs | 128 +++++++++++- crates/sequencer/src/runner.rs | 7 + crates/sequencer/src/types.rs | 9 + 8 files changed, 456 insertions(+), 55 deletions(-) diff --git a/bin/citrea/src/main.rs b/bin/citrea/src/main.rs index d78d755727..befc374dce 100644 --- a/bin/citrea/src/main.rs +++ b/bin/citrea/src/main.rs @@ -301,9 +301,14 @@ where info!("Starting listen mode sequencer"); start_rpc_server(rollup_config.rpc.clone(), &task_executor, rpc_module, None); - if let Err(e) = listen_mode_sequencer.run().await { - error!("Error: {}", e); - } + task_executor.spawn_critical_with_graceful_shutdown_signal( + "listen_mode_sequencer", + |shutdown_signal| async move { + if let Err(e) = listen_mode_sequencer.run(shutdown_signal).await { + error!("Error: {}", e); + } + }, + ); } (SequencerType::Normal(mut sequencer), rpc_module) => { info!("Starting sequencer"); diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index aed51bd7ec..0c00862ec6 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -454,29 +454,22 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { ) -> anyhow::Result { let prover_storage = storage_manager.create_storage_for_next_l2_height(); - if let Some((number, l2_block)) = ledger_db.get_head_l2_block()? { - // At least one l2 block was processed + if ledger_db.get_head_l2_block()?.is_some() { + // At least one l2 block was processed: derive params from the persisted state. + let init_params = + citrea_common::read_init_params_from_db(ledger_db, storage_manager)?; info!( - "Initialize node at L2 height #{}. State root: 0x{}. Last l2 block hash: 0x{}.", - number.0, - hex::encode(prover_storage.get_root_hash(number.0 + 1)?), - hex::encode(l2_block.hash) + "Initialize node. State root: 0x{}. Last l2 block hash: 0x{}.", + hex::encode(init_params.prev_state_root), + hex::encode(init_params.prev_l2_block_hash) ); - - return Ok(InitParams { - prev_state_root: prover_storage.get_root_hash(number.0 + 1)?, - prev_l2_block_hash: l2_block.hash, - }); + return Ok(init_params); } - let genesis_root = prover_storage.get_root_hash(1); - if let Ok(prev_state_root) = genesis_root { + if prover_storage.get_root_hash(1).is_ok() { // Chain was initialized but no L2 blocks were processed debug!("Chain is already initialized. Skipping initialization."); - return Ok(InitParams { - prev_state_root, - prev_l2_block_hash: [0; 32], - }); + return citrea_common::read_init_params_from_db(ledger_db, storage_manager); } info!("No history detected. Initializing chain...",); diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 0ec3d26e8a..1ccee15fd3 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -11,7 +11,10 @@ pub mod utils; pub use config::*; use serde::{Deserialize, Serialize}; +use sov_db::ledger_db::SharedLedgerOps; +use sov_prover_storage_manager::ProverStorageManager; use sov_rollup_interface::zk::StorageRootHash; +use sov_state::storage::NativeStorage; type L2BlockHash = [u8; 32]; @@ -22,6 +25,37 @@ pub struct InitParams { pub prev_l2_block_hash: L2BlockHash, } +/// Derives [`InitParams`] from the already-persisted ledger and storage state, **without** +/// (re)initializing genesis. +/// +/// This is the common case used both on normal node startup when a chain already exists, and when +/// a listen-mode sequencer is promoted to a block-producing sequencer at runtime: in both cases the +/// state has already been written to the DB and we only need to read back the current head's state +/// root and l2 block hash. +/// +/// Callers that need to perform genesis initialization (fresh chain) must handle that separately; +/// this helper assumes the chain already has state. +pub fn read_init_params_from_db( + ledger_db: &DB, + storage_manager: &ProverStorageManager, +) -> anyhow::Result { + let prover_storage = storage_manager.create_storage_for_next_l2_height(); + + if let Some((number, l2_block)) = ledger_db.get_head_l2_block()? { + // At least one l2 block was processed. + return Ok(InitParams { + prev_state_root: prover_storage.get_root_hash(number.0 + 1)?, + prev_l2_block_hash: l2_block.hash, + }); + } + + // Chain was initialized but no L2 blocks were processed yet. + Ok(InitParams { + prev_state_root: prover_storage.get_root_hash(1)?, + prev_l2_block_hash: [0; 32], + }) +} + /// Variant to specify how to start processing L1 blocks pub enum StartVariant { /// Resume from the last scanned L1 block height, the following L1 block will be the next one to process. diff --git a/crates/sequencer/src/lib.rs b/crates/sequencer/src/lib.rs index cee94421ab..5bba41e73a 100644 --- a/crates/sequencer/src/lib.rs +++ b/crates/sequencer/src/lib.rs @@ -33,6 +33,7 @@ //! Transition Function's inner workings, allowing it to preview transaction results before //! finalizing L2 blocks. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use anyhow::Result; @@ -47,7 +48,7 @@ use deposit_data_mempool::DepositDataMempool; use jsonrpsee::RpcModule; use listen_mode::l1_syncer::L1Syncer; use listen_mode::mempool_syncer::MempoolSyncer; -use listen_mode::ListenModeSequencer; +use listen_mode::{ListenModeSequencer, ProducerParts}; use mempool::CitreaMempool; use parking_lot::Mutex; use reth_provider::CanonStateNotification; @@ -178,6 +179,24 @@ where task_executor.spawn_critical("mempool-maintenance", maintenance_future); } + // In listen mode, expose a conversion handle so the RPC layer can promote this node to a + // producer at runtime (after verifying the main sequencer is unreachable). + let conversion = if is_listen_mode { + let sequencer_url = sequencer_config + .listen_mode_config + .as_ref() + .expect("Listen Mode Config must be set in listen mode") + .sequencer_client_url + .clone(); + Some(rpc::ConversionHandle { + sequencer_url, + convert_tx: rpc_message_tx.clone(), + started: Arc::new(AtomicBool::new(false)), + }) + } else { + None + }; + let rpc_storage = storage_manager.create_final_view_storage(); let rpc_context = rpc::create_rpc_context( mempool.clone(), @@ -189,6 +208,7 @@ where l2_block_tx.subscribe(), mempool_transaction_tx.clone(), mempool_transaction_tx.subscribe(), + conversion, ); let rpc_module = rpc::register_rpc_methods(rpc_context, rpc_module)?; @@ -202,6 +222,30 @@ where .clone() .expect("Listen Mode Config must be set in listen mode"); + // Producer configuration used if/when this node is promoted to producer: identical config + // but no longer in listen mode. + let mut producer_config = sequencer_config.clone(); + producer_config.listen_mode_config = None; + + // Capture everything needed to build a producer at conversion time, before the syncers + // below consume the originals. + let producer_parts = ProducerParts { + da_service: da_service.clone(), + config: producer_config, + public_keys: public_keys.clone(), + storage_manager: storage_manager.clone(), + ledger_db: ledger_db.clone(), + db_provider, + mempool: mempool.clone(), + deposit_mempool: deposit_mempool.clone(), + l2_block_tx: l2_block_tx.clone(), + mempool_transaction_tx: mempool_transaction_tx.clone(), + backup_manager: backup_manager.clone(), + rpc_message_rx, + canon_state_tx, + task_executor, + }; + let l2_syncer = L2Syncer::new( listen_mode_config.sequencer_client_url.clone(), listen_mode_config.sync_blocks_count, @@ -244,8 +288,8 @@ where l2_syncer, l1_syncer, mempool_syncer, - task_executor, ledger_db.clone(), + producer_parts, ); Ok((SequencerType::ListenMode(listen_mode_sequencer), rpc_module)) } else { diff --git a/crates/sequencer/src/listen_mode/mod.rs b/crates/sequencer/src/listen_mode/mod.rs index 057d38c543..a5e46d4c5a 100644 --- a/crates/sequencer/src/listen_mode/mod.rs +++ b/crates/sequencer/src/listen_mode/mod.rs @@ -21,14 +21,36 @@ //! Normally producer sequencer stores all the mempool transactions in persistent storage as well to recover them in case of crashes and restarts //! For that reason listen mode sequencer also stores all mempool transactions in its own persistent storage, updates the persistent storage regularly and does not keep in block txs in that storage //! When restarted as producer sequencer, it will put all the txs in the persistent storage back into mempool +use std::sync::Arc; use std::time::Duration; use citrea_common::l2::{AppliedL2Block, L2BlockProcessor, L2Syncer}; +use citrea_common::{read_init_params_from_db, RollupPublicKeys, SequencerConfig}; +use citrea_primitives::forks::get_forks; +use citrea_stf::runtime::{CitreaRuntime, DefaultContext}; use l1_syncer::L1Syncer; use mempool_syncer::MempoolSyncer; -use reth_tasks::TaskExecutor; +use parking_lot::Mutex; +use reth_provider::CanonStateNotification; +use reth_tasks::shutdown::GracefulShutdown; +use reth_tasks::{TaskExecutor, TaskManager}; +use sov_db::ledger_db::{LedgerDB, SequencerLedgerOps, SharedLedgerOps}; use sov_db::schema::types::L2BlockNumber; +use sov_modules_stf_blueprint::StfBlueprint; +use sov_prover_storage_manager::ProverStorageManager; +use sov_rollup_interface::fork::ForkManager; +use sov_rollup_interface::rpc::MempoolTransactionSignal; use sov_rollup_interface::services::da::DaService; +use tokio::runtime::Handle; +use tokio::sync::broadcast; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::{info, warn}; + +use crate::db_provider::DbProvider; +use crate::deposit_data_mempool::DepositDataMempool; +use crate::mempool::CitreaMempool; +use crate::types::SequencerRpcMessage; +use crate::CitreaSequencer; use super::metrics::SEQUENCER_METRICS as SM; @@ -59,18 +81,102 @@ where } } +/// Everything needed to build a block-producing [`CitreaSequencer`] when a listen-mode sequencer is +/// promoted to producer at runtime. +/// +/// The listen-mode syncers consume the original `init_params`, `StfBlueprint` and `ForkManager`, so +/// those are re-derived at conversion time (see [`build_producer`]). This struct holds the rest of +/// the handles — all cheaply cloneable or single-owner — so that no process restart is required to +/// start producing blocks. +pub struct ProducerParts { + /// Data availability service. + pub da_service: Arc, + /// Producer sequencer configuration (with `listen_mode_config` cleared). + pub config: SequencerConfig, + /// Rollup public keys. + pub public_keys: RollupPublicKeys, + /// Prover storage manager (shares the same backing DBs as the syncers). + pub storage_manager: ProverStorageManager, + /// Ledger database. + pub ledger_db: LedgerDB, + /// Database provider used by the mempool. + pub db_provider: DbProvider, + /// Transaction mempool (shared with the RPC layer). + pub mempool: Arc, + /// Deposit transaction mempool. + pub deposit_mempool: Arc>, + /// Broadcast sender for L2 block notifications. + pub l2_block_tx: broadcast::Sender, + /// Broadcast sender for mempool transaction notifications. + pub mempool_transaction_tx: broadcast::Sender, + /// Backup manager. + pub backup_manager: Arc, + /// Receiver for RPC control messages. Used by the orchestrator to receive the convert signal, + /// then handed to the producer to receive halt/resume/test-block messages. + pub rpc_message_rx: UnboundedReceiver, + /// Canonical state notification sender for mempool maintenance. + pub canon_state_tx: UnboundedSender, + /// Task executor used by the producer to spawn its background tasks. + pub task_executor: TaskExecutor, +} + +/// Builds a block-producing [`CitreaSequencer`] from [`ProducerParts`], re-deriving the pieces that +/// were consumed by the listen-mode syncers (`InitParams` read back from the DB, a fresh +/// `StfBlueprint`, and a fresh `ForkManager` at the current head height). +/// +/// The caller MUST have stopped the listen-mode syncers and awaited their completion before calling +/// this, so the head state read here is final and there is only ever a single writer to the ledger. +fn build_producer(parts: ProducerParts) -> anyhow::Result> { + let current_l2_height = parts + .ledger_db + .get_head_l2_block()? + .map(|(l2_height, _)| l2_height.0) + .unwrap_or(0); + + let mut fork_manager = ForkManager::new(get_forks(), current_l2_height); + fork_manager.register_handler(Box::new(parts.ledger_db.clone())); + + let native_stf = StfBlueprint::>::new(); + let init_params = read_init_params_from_db(&parts.ledger_db, &parts.storage_manager)?; + + CitreaSequencer::new( + parts.da_service, + parts.config, + init_params, + native_stf, + parts.storage_manager, + parts.public_keys, + parts.ledger_db, + parts.db_provider, + parts.mempool, + parts.deposit_mempool, + fork_manager, + parts.l2_block_tx, + parts.mempool_transaction_tx, + parts.backup_manager, + parts.rpc_message_rx, + parts.canon_state_tx, + parts.task_executor, + ) +} + /// Listen Mode Sequencer that synchronizes both L1 and L2 blocks and commitments /// This struct encapsulates the L1 and L2 block synchronization services /// and provides a run loop for processing incoming L1 and L2 blocks and commitments. /// It is designed to maintain the sequencer's state in listen mode. /// +/// In addition to staying in sync, it can be promoted to a block-producing sequencer at runtime via +/// the `citrea_convertToProducer` RPC (see [`ProducerParts`] and [`build_producer`]). On promotion +/// it stops its syncers and starts producing blocks from the state it has already synced, without a +/// process restart. +/// /// # Type Parameters /// * `DA` - Data Availability service type /// * `DB` - Database type that implements `SequencerLedgerOps` for ledger operations pub struct ListenModeSequencer where DA: DaService, - DB: sov_db::ledger_db::SequencerLedgerOps + Clone + Send + Sync + 'static, + DB: SequencerLedgerOps + Clone + Send + Sync + 'static, { /// L2 block synchronization service for the listen mode sequencer pub l2_syncer: ListenModeSequencerL2Syncer, @@ -78,68 +184,149 @@ where pub l1_syncer: L1Syncer, /// Mempool synchronization service for the listen mode sequencer pub mempool_syncer: MempoolSyncer, - /// Task executor for running asynchronous tasks - pub task_executor: TaskExecutor, /// Database for ledger operations pub ledger_db: DB, + /// Handles required to build a producer sequencer on conversion. + pub producer_parts: ProducerParts, } impl ListenModeSequencer where DA: DaService, - DB: sov_db::ledger_db::SequencerLedgerOps + Clone + Send + Sync + 'static, + DB: SequencerLedgerOps + Clone + Send + Sync + 'static, { /// Creates a new Listen Mode Sequencer instance /// /// # Arguments /// * `l2_syncer` - L2 block synchronization service /// * `l1_syncer` - L1 block synchronization service - /// * `task_executor` - Task executor for running asynchronous tasks + /// * `mempool_syncer` - Mempool synchronization service + /// * `ledger_db` - Database for ledger operations + /// * `producer_parts` - Handles needed to build a producer sequencer on conversion pub fn new( l2_syncer: ListenModeSequencerL2Syncer, l1_syncer: L1Syncer, mempool_syncer: MempoolSyncer, - task_executor: TaskExecutor, ledger_db: DB, + producer_parts: ProducerParts, ) -> Self { Self { l2_syncer, l1_syncer, mempool_syncer, - task_executor, ledger_db, + producer_parts, } } - /// Main Listen Mode Sequencer run loop + /// Main Listen Mode Sequencer run loop. + /// + /// Spawns the L2/L1/mempool syncers under a dedicated [`TaskManager`] so they can be stopped + /// independently of the process shutdown, then waits for either: + /// - the process `shutdown_signal`, in which case it stops the syncers and returns, or + /// - a [`SequencerRpcMessage::ConvertToProducer`] signal, in which case it stops the syncers, + /// builds a producer sequencer from the synced state, and hands off to its run loop. /// /// # Arguments - /// * `shutdown_signal` - Signal for graceful shutdown - pub async fn run(self) -> Result<(), anyhow::Error> { - // Start L2 syncer task - self.task_executor - .spawn_critical_with_graceful_shutdown_signal( - "listen_mode_sequencer_l2_syncer", - |shutdown_signal| async move { self.l2_syncer.run(shutdown_signal).await }, - ); + /// * `shutdown_signal` - Signal for graceful shutdown of the whole node + pub async fn run(self, mut shutdown_signal: GracefulShutdown) -> Result<(), anyhow::Error> { + let ListenModeSequencer { + l2_syncer, + l1_syncer, + mempool_syncer, + ledger_db, + mut producer_parts, + } = self; - self.task_executor - .spawn_with_graceful_shutdown_signal(|shutdown_signal| async move { - self.mempool_syncer.run(shutdown_signal).await - }); + // Dedicated task manager so the syncers can be shut down independently of the node, which is + // required to release the ledger as the single writer before the producer starts. + let syncer_manager = TaskManager::new(Handle::current()); + let syncer_executor = syncer_manager.executor(); - while self.ledger_db.get_head_l2_block_height()?.unwrap_or(0) < 1 { - // Wait until one block to be processed before starting L1 syncer - tokio::time::sleep(Duration::from_millis(100)).await; - } + // Start L2 syncer task + syncer_executor.spawn_critical_with_graceful_shutdown_signal( + "listen_mode_sequencer_l2_syncer", + |shutdown_signal| async move { l2_syncer.run(shutdown_signal).await }, + ); + + // Start mempool syncer task + syncer_executor.spawn_with_graceful_shutdown_signal(|shutdown_signal| async move { + mempool_syncer.run(shutdown_signal).await + }); - // Start L1 syncer task - self.task_executor - .spawn_critical_with_graceful_shutdown_signal( + // Start L1 syncer task once at least one L2 block has been processed. + { + let ledger_db = ledger_db.clone(); + syncer_executor.spawn_critical_with_graceful_shutdown_signal( "listen_mode_sequencer_l1_syncer", - |shutdown_signal| async move { self.l1_syncer.run(shutdown_signal).await }, + |shutdown_signal| async move { + while ledger_db + .get_head_l2_block_height() + .ok() + .flatten() + .unwrap_or(0) + < 1 + { + tokio::time::sleep(Duration::from_millis(100)).await; + } + l1_syncer.run(shutdown_signal).await + }, ); + } - Ok(()) + let mut syncer_manager = Some(syncer_manager); + let mut control_closed = false; + loop { + tokio::select! { + _ = &mut shutdown_signal => { + info!("Shutting down listen mode sequencer"); + if let Some(manager) = syncer_manager.take() { + let _ = tokio::task::spawn_blocking(move || manager.graceful_shutdown()).await; + } + return Ok(()); + } + msg = producer_parts.rpc_message_rx.recv(), if !control_closed => { + match msg { + Some(SequencerRpcMessage::ConvertToProducer { ack }) => { + info!("Listen mode sequencer: received convert-to-producer signal"); + + // Stop the syncers and wait for them to fully drain so the producer + // starts from the final head state with no competing writer. + if let Some(manager) = syncer_manager.take() { + if let Err(e) = + tokio::task::spawn_blocking(move || manager.graceful_shutdown()) + .await + { + let msg = format!("failed to stop listen-mode syncers: {e}"); + let _ = ack.send(Err(msg.clone())); + return Err(anyhow::anyhow!(msg)); + } + } + + match build_producer(producer_parts) { + Ok(mut producer) => { + let _ = ack.send(Ok(())); + info!("Listen mode sequencer: converted to producer, starting block production"); + return producer.run(shutdown_signal).await; + } + Err(e) => { + let _ = ack.send(Err(format!("failed to build producer: {e}"))); + return Err(e); + } + } + } + Some(_) => { + // Other RPC control messages (halt/resume/test block) are not + // applicable while in listen mode; ignore them. + warn!("Listen mode sequencer: ignoring unsupported RPC control message"); + } + None => { + warn!("Listen mode sequencer: RPC control channel closed"); + control_closed = true; + } + } + } + } + } } } diff --git a/crates/sequencer/src/rpc.rs b/crates/sequencer/src/rpc.rs index 53c741396f..110c7847b3 100644 --- a/crates/sequencer/src/rpc.rs +++ b/crates/sequencer/src/rpc.rs @@ -1,6 +1,6 @@ -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use alloy_eips::eip2718::Encodable2718; use alloy_eips::BlockId; @@ -10,10 +10,12 @@ use alloy_rpc_types_txpool::TxpoolContent; use citrea_common::rpc::utils::internal_rpc_error; use citrea_evm::Evm; use citrea_stf::runtime::DefaultContext; +use jsonrpsee::core::client::ClientT; use jsonrpsee::core::{RpcResult, SubscriptionResult}; +use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::{ErrorCode, ErrorObject}; -use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; +use jsonrpsee::{rpc_params, PendingSubscriptionSink, SubscriptionSink}; use parking_lot::Mutex; use reth_rpc::eth::EthTxBuilder; use reth_rpc_eth_types::error::EthApiError; @@ -43,6 +45,59 @@ enum BlockReceiveResult { ChannelClosed, } +/// Handle that allows the RPC layer to promote a listen-mode sequencer into a block-producing +/// sequencer at runtime. Only populated when the node is started in listen mode. +#[derive(Clone)] +pub struct ConversionHandle { + /// HTTP URL of the main sequencer this listen-mode node is following. Used to verify the main + /// sequencer is unreachable before promoting (so we don't end up with two producers). + pub sequencer_url: String, + /// Channel used to signal the listen-mode orchestrator to convert to producer. + pub convert_tx: UnboundedSender, + /// Guards against concurrent / repeated conversion attempts. + pub started: Arc, +} + +/// Number of times the main sequencer is probed before it is considered unreachable. +const MAIN_SEQUENCER_PROBE_ATTEMPTS: u32 = 5; +/// Delay between consecutive main sequencer probes. +const MAIN_SEQUENCER_PROBE_INTERVAL: Duration = Duration::from_secs(2); +/// Per-probe request timeout when checking the main sequencer. +const MAIN_SEQUENCER_PROBE_TIMEOUT: Duration = Duration::from_secs(3); + +/// Probes the main sequencer's RPC endpoint and returns `true` if it responds to **any** of the +/// probe attempts. Used to make sure the main sequencer is truly gone before a listen-mode node +/// promotes itself to producer. Requiring a single success (rather than a single failure) to call +/// it "reachable" makes the failover conservative: a transient blip will not falsely promote a +/// backup while the main sequencer is alive. +async fn main_sequencer_reachable(url: &str) -> bool { + let client = match HttpClientBuilder::default() + .request_timeout(MAIN_SEQUENCER_PROBE_TIMEOUT) + .build(url) + { + Ok(client) => client, + Err(e) => { + error!("Could not build probe client for {url}: {e}"); + return false; + } + }; + + for attempt in 1..=MAIN_SEQUENCER_PROBE_ATTEMPTS { + match client + .request::("eth_blockNumber", rpc_params![]) + .await + { + Ok(_) => return true, + Err(e) => debug!("Main sequencer probe {attempt}/{MAIN_SEQUENCER_PROBE_ATTEMPTS} failed: {e}"), + } + if attempt < MAIN_SEQUENCER_PROBE_ATTEMPTS { + tokio::time::sleep(MAIN_SEQUENCER_PROBE_INTERVAL).await; + } + } + + false +} + /// RPC context containing all the shared data needed for RPC method implementations pub struct RpcContext { /// The transaction mempool @@ -63,6 +118,8 @@ pub struct RpcContext { pub mempool_transaction_tx: broadcast::Sender, /// Broadcast receiver for mempool transaction notifications pub mempool_transaction_rx: broadcast::Receiver, + /// Handle for converting a listen-mode sequencer into a producer. `None` in producer mode. + pub conversion: Option, } /// Creates a shared RpcContext with all required data. @@ -86,6 +143,7 @@ pub fn create_rpc_context( l2_block_rx: broadcast::Receiver, mempool_transaction_tx: broadcast::Sender, mempool_transaction_rx: broadcast::Receiver, + conversion: Option, ) -> RpcContext { RpcContext { mempool, @@ -97,6 +155,7 @@ pub fn create_rpc_context( l2_block_rx, mempool_transaction_tx, mempool_transaction_rx, + conversion, } } @@ -207,6 +266,15 @@ pub trait SequencerRpc { #[method(name = "citrea_resumeCommitments")] async fn resume_commitments(&self) -> RpcResult<()>; + /// Promotes a listen-mode sequencer into a block-producing sequencer at runtime. + /// + /// This is only valid on a node started in listen mode. Before promoting, the node verifies + /// that the main sequencer it is following is unreachable (so two producers do not run at + /// once). On success the node stops its listen-mode syncers and starts producing blocks from + /// the state it has already synced. + #[method(name = "citrea_convertToProducer")] + async fn convert_to_producer(&self) -> RpcResult<()>; + /// Subscribe to Citrea events #[subscription(name = "citrea_subscribe" => "citrea_subscription", unsubscribe = "citrea_unsubscribe", item = L2BlockResponse)] async fn subscribe_citrea(&self, topic: String) -> SubscriptionResult; @@ -466,6 +534,60 @@ impl SequencerRpcServer for SequencerRpcServerImpl { }) } + /// Promote a listen-mode sequencer into a block-producing sequencer. + async fn convert_to_producer(&self) -> RpcResult<()> { + debug!("Sequencer: citrea_convertToProducer"); + + let conversion = self.context.conversion.as_ref().ok_or_else(|| { + internal_rpc_error( + "Sequencer is not running in listen mode; cannot convert to producer".to_string(), + ) + })?; + + // Guard against concurrent or repeated conversion attempts. + if conversion.started.swap(true, Ordering::SeqCst) { + return Err(internal_rpc_error( + "Conversion to producer is already in progress".to_string(), + )); + } + + // Make sure the main sequencer is actually gone before promoting, otherwise we would end up + // with two producers writing the same chain. + if main_sequencer_reachable(&conversion.sequencer_url).await { + conversion.started.store(false, Ordering::SeqCst); + return Err(internal_rpc_error( + "Main sequencer is still reachable; aborting conversion to producer".to_string(), + )); + } + + let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); + if let Err(e) = conversion + .convert_tx + .send(SequencerRpcMessage::ConvertToProducer { ack: ack_tx }) + { + conversion.started.store(false, Ordering::SeqCst); + return Err(internal_rpc_error(format!( + "Could not send convert-to-producer signal: {e}" + ))); + } + + match ack_rx.await { + Ok(Ok(())) => { + debug!("Sequencer: conversion to producer accepted"); + Ok(()) + } + Ok(Err(reason)) => { + conversion.started.store(false, Ordering::SeqCst); + Err(internal_rpc_error(format!( + "Conversion to producer failed: {reason}" + ))) + } + Err(e) => Err(internal_rpc_error(format!( + "Conversion to producer ack channel closed: {e}" + ))), + } + } + /// Subscribe to Citrea events async fn subscribe_citrea( &self, diff --git a/crates/sequencer/src/runner.rs b/crates/sequencer/src/runner.rs index e6a1f8353d..074890e556 100644 --- a/crates/sequencer/src/runner.rs +++ b/crates/sequencer/src/runner.rs @@ -1268,6 +1268,13 @@ where info!("Sequencer: Resumed commitments via RPC"); } }, + Some(SequencerRpcMessage::ConvertToProducer { ack }) => { + // This sequencer is already producing blocks. Conversion is only valid + // for a listen-mode sequencer, so reject it here. + let _ = ack.send(Err( + "Sequencer is already running in producer mode".to_string(), + )); + }, None => { // Channel closed warn!("RPC message channel closed"); diff --git a/crates/sequencer/src/types.rs b/crates/sequencer/src/types.rs index 53c6d663f9..2adf47f343 100644 --- a/crates/sequencer/src/types.rs +++ b/crates/sequencer/src/types.rs @@ -7,4 +7,13 @@ pub enum SequencerRpcMessage { HaltCommitments, /// Resume sequencer commitments ResumeCommitments, + /// Convert a listen-mode sequencer into a block-producing sequencer. + /// + /// Only handled by the listen-mode orchestrator. The `ack` channel is used to report the + /// outcome of the conversion (e.g. success, or a reason it was rejected) back to the RPC + /// caller. + ConvertToProducer { + /// Channel used to report the conversion outcome back to the RPC handler. + ack: tokio::sync::oneshot::Sender>, + }, } From af6b8950774f88973f79b37be9018a22f0a5e14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 13:44:10 +0300 Subject: [PATCH 2/8] Add shutdown signal to listen mode test spawner --- bin/citrea/tests/common/helpers.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bin/citrea/tests/common/helpers.rs b/bin/citrea/tests/common/helpers.rs index 0513c0f2ed..884ae53c3e 100644 --- a/bin/citrea/tests/common/helpers.rs +++ b/bin/citrea/tests/common/helpers.rs @@ -289,8 +289,12 @@ pub async fn start_rollup( ); task_executor.spawn_critical_with_graceful_shutdown_signal( "ListenModeSequencer", - |_| async move { - listen_mode_sequencer.run().instrument(span).await.unwrap(); + |shutdown_signal| async move { + listen_mode_sequencer + .run(shutdown_signal) + .instrument(span) + .await + .unwrap(); }, ); } From ab2d463170190f4988e3680d13b061ea34ff5384 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 13:47:32 +0300 Subject: [PATCH 3/8] Fix retry infinite loop --- .../src/listen_mode/mempool_syncer.rs | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/crates/sequencer/src/listen_mode/mempool_syncer.rs b/crates/sequencer/src/listen_mode/mempool_syncer.rs index 457f5c09d3..45fb23d8ba 100644 --- a/crates/sequencer/src/listen_mode/mempool_syncer.rs +++ b/crates/sequencer/src/listen_mode/mempool_syncer.rs @@ -69,26 +69,41 @@ where } /// Runs the subscription task for mempool transaction updates - pub async fn run_subscription_task(&self, shutdown_signal: GracefulShutdown) { + pub async fn run_subscription_task(&self, mut shutdown_signal: GracefulShutdown) { loop { - let exponential_backoff = ExponentialBackoff::default(); - let _ = retry_backoff(exponential_backoff, || async { - subscribe_to_mempool_transaction_updates( - &self.sequencer_ws_endpoint, - self.transactions_buffer.clone(), - self.transactions_to_remove_buffer.clone(), - shutdown_signal.clone(), - ) - .await - .map_err(|e| { - error!("Subscription error: {}", e); - backoff::Error::Transient { - err: e, - retry_after: None, - } - }) - }) - .await; + // `inner_shutdown` is used by the subscription itself; the original `shutdown_signal` + // breaks the retry loop on shutdown. This matters because when the sequencer is gone the + // WS connect fails *before* the inner shutdown check is reached, so without this guard + // the retry loop would spin forever and never observe shutdown (which in turn would + // block a listen->producer conversion from completing). + let inner_shutdown = shutdown_signal.clone(); + tokio::select! { + biased; + _ = &mut shutdown_signal => { + info!("Shutting down mempool subscription task"); + return; + } + _ = async { + let exponential_backoff = ExponentialBackoff::default(); + let _ = retry_backoff(exponential_backoff, || async { + subscribe_to_mempool_transaction_updates( + &self.sequencer_ws_endpoint, + self.transactions_buffer.clone(), + self.transactions_to_remove_buffer.clone(), + inner_shutdown.clone(), + ) + .await + .map_err(|e| { + error!("Subscription error: {}", e); + backoff::Error::Transient { + err: e, + retry_after: None, + } + }) + }) + .await; + } => {} + } } } From e258562a6d0b5cb47da1247ae5b79f239b54573e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 13:48:09 +0300 Subject: [PATCH 4/8] Timeout on l1, l2 and mempool syncers to shutdown before conversion --- crates/sequencer/src/listen_mode/mod.rs | 32 ++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/crates/sequencer/src/listen_mode/mod.rs b/crates/sequencer/src/listen_mode/mod.rs index a5e46d4c5a..d5cc13ce0d 100644 --- a/crates/sequencer/src/listen_mode/mod.rs +++ b/crates/sequencer/src/listen_mode/mod.rs @@ -54,6 +54,10 @@ use crate::CitreaSequencer; use super::metrics::SEQUENCER_METRICS as SM; +/// Maximum time to wait for the listen-mode syncers to stop before a listen->producer conversion +/// proceeds anyway. Bounds the conversion so a misbehaving syncer can never hang it indefinitely. +const SYNCER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30); + /// Module for syncing and storing sequencer commitments extracted from L1 blocks. pub(crate) mod l1_syncer; /// Module containing mempool synchronization functionality for listen mode sequencer @@ -281,7 +285,10 @@ where _ = &mut shutdown_signal => { info!("Shutting down listen mode sequencer"); if let Some(manager) = syncer_manager.take() { - let _ = tokio::task::spawn_blocking(move || manager.graceful_shutdown()).await; + let _ = tokio::task::spawn_blocking(move || { + manager.graceful_shutdown_with_timeout(SYNCER_SHUTDOWN_TIMEOUT) + }) + .await; } return Ok(()); } @@ -291,15 +298,24 @@ where info!("Listen mode sequencer: received convert-to-producer signal"); // Stop the syncers and wait for them to fully drain so the producer - // starts from the final head state with no competing writer. + // starts from the final head state with no competing writer. Bounded by + // a timeout so a syncer that fails to stop can never hang the conversion. if let Some(manager) = syncer_manager.take() { - if let Err(e) = - tokio::task::spawn_blocking(move || manager.graceful_shutdown()) - .await + match tokio::task::spawn_blocking(move || { + manager.graceful_shutdown_with_timeout(SYNCER_SHUTDOWN_TIMEOUT) + }) + .await { - let msg = format!("failed to stop listen-mode syncers: {e}"); - let _ = ack.send(Err(msg.clone())); - return Err(anyhow::anyhow!(msg)); + Ok(true) => {} + Ok(false) => warn!( + "Listen mode sequencer: syncers did not stop within timeout; \ + proceeding with conversion" + ), + Err(e) => { + let msg = format!("failed to stop listen-mode syncers: {e}"); + let _ = ack.send(Err(msg.clone())); + return Err(anyhow::anyhow!(msg)); + } } } From 841b905682c368ed0874e37633a71b94399aa90d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 13:48:18 +0300 Subject: [PATCH 5/8] E2e rpc toggle test --- .../tests/bitcoin/listen_mode_sequencer.rs | 441 ++++++++++++++++++ 1 file changed, 441 insertions(+) diff --git a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs index 804054f0cd..1c7f54732c 100644 --- a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs +++ b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs @@ -450,6 +450,447 @@ async fn read_only_sequencer_test() -> Result<()> { .await } +struct ReadOnlySequencerRpcToggleTest; + +/* +Same as `ReadOnlySequencerTest` through step 11, but instead of reviving the read-only sequencer by +restarting it with the main sequencer's config, it is promoted to producer **in-process** via the +`citrea_convertToProducer` RPC (no restart, no address takeover). + +1. Start a sequencer cluster with 2 sequencers +2. Configure one sequencer as a read-only sequencer +3. Send some L2 blocks to the sequencer with some transactions +4. Verify that the read-only sequencer can fetch the L2 blocks from the main sequencer +5. Send a commitment from the main sequencer +6. Verify that the read-only sequencer can fetch the commitment after the commitment is finalized but not before +7. Open a full node and sync to the main sequencer and see commitments +8. Publish more L2 blocks and verify that the read-only sequencer can fetch them +9. Send another commitment from the main sequencer +10. Verify that the read-only sequencer cannot fetch it because it is not finalized yet +11. Shut Down main sequencer and full node. +12. Call the RPC of the read-only sequencer to toggle from listen mode to producer +13. Verify that after toggle the read-only sequencer does have the non-finalized commitment +14. Publish more l2 blocks from the toggled sequencer with transactions +15. Restart full node pointed at the toggled sequencer's url +16. See that full node can sync properly +17. Check the read-only sequencer historical state works as intended +18. Send commitment from toggled sequencer and get it finalized +19. Verify that full node can fetch the finalized commitment and verify it +20. Roll back the toggled sequencer and full node to a previous state +21. Publish more l2 blocks and still see full node can sync with toggled sequencer +*/ +#[async_trait] +impl TestCase for ReadOnlySequencerRpcToggleTest { + fn test_config() -> TestCaseConfig { + TestCaseConfig { + n_nodes: HashMap::from([(NodeKind::Sequencer, 2)]), + with_sequencer: true, + with_full_node: true, + with_citrea_cli: true, + ..Default::default() + } + } + + fn scan_l1_start_height() -> Option { + Some(147) + } + + async fn run_test(&mut self, f: &mut TestFramework) -> Result<()> { + let Some(cluster) = &mut f.sequencer_cluster else { + anyhow::bail!("Sequencer cluster not running. Set n_nodes with Sequencer to 2 or more") + }; + + let mut cluster_iter = cluster.iter_mut(); + let sequencer = cluster_iter.next().unwrap(); + let readonly_sequencer = cluster_iter.next().unwrap(); + + let full_node = f.full_node.as_mut().unwrap(); + + let da = f.bitcoin_nodes.get_mut(0).unwrap(); + + let sequ_host = sequencer.config.clone().rollup.rpc.bind_host; + let sequ_port = sequencer.config.clone().rollup.rpc.bind_port; + + let seq_test_client = + make_test_client(SocketAddr::new(sequ_host.parse()?, sequ_port)).await?; + + let max_l2_blocks_per_commitment = sequencer.config.node.max_l2_blocks_per_commitment; + + let some_address = Address::random(); + + for _ in 0..max_l2_blocks_per_commitment / 2 { + let _ = seq_test_client + .send_eth(some_address, None, None, None, 1e18 as u128) + .await + .unwrap(); + sequencer.client.send_publish_batch_request().await?; + } + let head_l2_height = sequencer + .client + .http_client() + .get_head_l2_block_height() + .await?; + + // Wait for the readonly sequencer to catch up + readonly_sequencer + .wait_for_l2_height(head_l2_height.to::(), None) + .await?; + + // Fetch all l2 blocks and compare them + let l2_blocks = readonly_sequencer + .client + .http_client() + .get_l2_block_range(U64::from(1), head_l2_height) + .await + .unwrap(); + + let sequencer_rpc_blocks = sequencer + .client + .http_client() + .get_l2_block_range(U64::from(1), head_l2_height) + .await + .unwrap(); + + for (sequ_block, readonly_block) in sequencer_rpc_blocks.iter().zip(l2_blocks) { + assert_eq!(*sequ_block, readonly_block); + for (sequ_tx, readonly_tx) in readonly_block + .as_ref() + .unwrap() + .txs + .iter() + .zip(readonly_block.as_ref().unwrap().txs.iter()) + { + assert_eq!(sequ_tx, readonly_tx); + } + } + + for _ in 0..max_l2_blocks_per_commitment / 2 { + sequencer.client.send_publish_batch_request().await?; + } + + // Expect sequencer to send commitment + da.wait_mempool_len(2, None).await?; + da.generate(1).await?; + + let sequencers_commitment = sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(1)) + .await?; + assert!(sequencers_commitment.is_some()); + let readonly_commitment = readonly_sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(1)) + .await?; + assert!(readonly_commitment.is_none()); + + // Now that it is finalized, readonly sequencer should be able to fetch it + da.generate(DEFAULT_FINALITY_DEPTH - 1).await?; + + let finalized_height = da.get_finalized_height(None).await?; + + // Wait for the readonly sequencer l1 syncer to catch up + readonly_sequencer + .wait_for_l1_height(finalized_height, None) + .await?; + + let readonly_commitment = readonly_sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(1)) + .await?; + // Now should have the commitment + assert!(readonly_commitment.is_some()); + + full_node.wait_for_l1_height(finalized_height, None).await?; + let full_node_commitment = full_node + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(1)) + .await?; + // Full node should also have the commitment + assert!(full_node_commitment.is_some()); + + // Now publish more l2 blocks for another commitment + for _ in 0..max_l2_blocks_per_commitment { + sequencer.client.send_publish_batch_request().await?; + } + + let head_l2_height = sequencer + .client + .http_client() + .get_head_l2_block_height() + .await?; + + readonly_sequencer + .wait_for_l2_height(head_l2_height.to::(), None) + .await?; + + full_node + .wait_for_l2_height(head_l2_height.to::(), None) + .await?; + + // Expect sequencer to send commitment + da.wait_mempool_len(2, None).await?; + + // While the commitment is still in mempool, shut down the main sequencer and full node, then + // toggle the read-only sequencer to producer in-process via RPC (no restart). + sequencer.wait_until_stopped().await?; + full_node.wait_until_stopped().await?; + + sleep(std::time::Duration::from_secs(2)).await; + + // Promote the read-only sequencer to producer. The convert handler probes the (now-dead) + // main sequencer, sees it is unreachable, and starts producing from the synced state. The + // read-only sequencer keeps its own rpc address, so its client needs no repointing. + readonly_sequencer + .client + .http_client() + .convert_to_producer() + .await?; + + sleep(std::time::Duration::from_secs(2)).await; + + let readonly_sequencer_test_client = make_test_client(SocketAddr::new( + readonly_sequencer.config.rollup.rpc.bind_host.parse()?, + readonly_sequencer.config.rollup.rpc.bind_port, + )) + .await?; + // Now the readonly sequencer is the main sequencer + // Publish some blocks from the toggled sequencer + for _ in 0..max_l2_blocks_per_commitment / 2 { + let _ = readonly_sequencer_test_client + .send_eth(some_address, None, None, None, 1e18 as u128) + .await + .unwrap(); + readonly_sequencer + .client + .send_publish_batch_request() + .await?; + } + + // Start the full node pointed at the toggled sequencer's url (it kept its own address, so + // unlike the restart-based test it did not take over the main sequencer's address). + let mut full_node_config = full_node.config.clone(); + if let Some(runner) = full_node_config.rollup.runner.as_mut() { + runner.sequencer_client_url = format!( + "http://{}:{}", + readonly_sequencer.config.rollup.rpc.bind_host, + readonly_sequencer.config.rollup.rpc.bind_port, + ); + } + full_node.start(Some(full_node_config), None).await?; + + let head_l2_height = readonly_sequencer + .client + .http_client() + .get_head_l2_block_height() + .await?; + + // Wait for full node to sync with the toggled sequencer + full_node + .wait_for_l2_height(head_l2_height.to::(), None) + .await?; + + // Check the balance of the address + let balance = readonly_sequencer_test_client + .eth_get_balance(some_address, None) + .await + .unwrap(); + assert!(balance == U256::from(max_l2_blocks_per_commitment as u128 * 1e18 as u128)); + + // Check the balance of the address from the readonly sequencer + let readonly_balance = readonly_sequencer_test_client + .eth_get_balance(some_address, None) + .await + .unwrap(); + assert!( + readonly_balance == U256::from(max_l2_blocks_per_commitment as u128 * 1e18 as u128) + ); + + // Check the historical balance of the address before it was toggled + let historical_balance = readonly_sequencer_test_client + .eth_get_balance(some_address, Some(BlockId::earliest())) + .await + .unwrap(); + assert!(historical_balance == U256::from(0)); + let historical_balance = readonly_sequencer_test_client + .eth_get_balance(some_address, Some(BlockId::number(1))) + .await + .unwrap(); + assert!(historical_balance == U256::from(1e18 as u128)); + let historical_balance = readonly_sequencer_test_client + .eth_get_balance(some_address, Some(BlockId::number(2))) + .await + .unwrap(); + assert!(historical_balance == U256::from(2e18 as u128)); + + // Also see that the toggled sequencer can see the non-finalized commitment + let toggled_commitment = readonly_sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(2)) + .await?; + assert!(toggled_commitment.is_some()); + + // Now finalize the commitment and also see that full node can fetch it as well + da.generate(DEFAULT_FINALITY_DEPTH).await?; + let finalized_height = da.get_finalized_height(None).await?; + + full_node.wait_for_l1_height(finalized_height, None).await?; + + let full_node_commitment = full_node + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(2)) + .await?; + assert!(full_node_commitment.is_some()); + + // Now publish more l2 blocks and see that toggled sequencer can send commitments + for _ in 0..max_l2_blocks_per_commitment { + readonly_sequencer + .client + .send_publish_batch_request() + .await?; + } + + // Expect sequencer to send commitment + da.wait_mempool_len(2, None).await?; + da.generate(DEFAULT_FINALITY_DEPTH).await?; + + let new_commitment = readonly_sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(3)) + .await?; + assert!(new_commitment.is_some()); + + // Wait for full node to sync with the new commitment + full_node + .wait_for_l1_height(finalized_height + DEFAULT_FINALITY_DEPTH, None) + .await?; + + let new_full_node_commitment = full_node + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(3)) + .await?; + assert!(new_full_node_commitment.is_some()); + + assert_eq!( + new_commitment.unwrap().merkle_root, + new_full_node_commitment.unwrap().merkle_root + ); + + // Stop the readonly sequencer and full node + readonly_sequencer.wait_until_stopped().await?; + full_node.wait_until_stopped().await?; + + // Rollback bitcoin to initial height and drop existing txs so that we can re-send them out of order + let initial_height_hash = da.get_block_hash(f.initial_da_height + 1).await?; + da.invalidate_block(&initial_height_hash).await?; + + let citrea_cli = f.citrea_cli.as_ref().unwrap(); + // Rollback the toggled sequencer, full node and da to a previous state + citrea_cli + .run( + "rollback", + &[ + "--node-type", + "sequencer", + "--db-path", + readonly_sequencer + .config + .rollup + .storage + .path + .to_str() + .unwrap(), + "--l2-target", + "1", + "--l1-target", + "120", + "--sequencer-commitment-index", + "0", + ], + ) + .await?; + + citrea_cli + .run( + "rollback", + &[ + "--node-type", + "full-node", + "--db-path", + full_node.config.rollup.storage.path.to_str().unwrap(), + "--l2-target", + "1", + "--l1-target", + "120", + "--sequencer-commitment-index", + "0", + ], + ) + .await?; + + // Restart the toggled sequencer as a producer. The in-process toggle did not change the + // harness config, which still has `listen_mode_config` set, so clear it to come back up + // producing instead of as a follower. + let mut producer_config = readonly_sequencer.config.clone(); + producer_config.node.listen_mode_config = None; + readonly_sequencer.start(Some(producer_config), None).await?; + sleep(std::time::Duration::from_secs(2)).await; + full_node.start(None, None).await?; + sleep(std::time::Duration::from_secs(2)).await; + + // Check the head l2 heights are the same + let readonly_head_l2_height = readonly_sequencer + .client + .http_client() + .get_head_l2_block_height() + .await?; + let full_node_head_l2_height = full_node + .client + .http_client() + .get_head_l2_block_height() + .await?; + assert_eq!(readonly_head_l2_height, full_node_head_l2_height); + + // Publish more l2 blocks and see that full node can still sync with toggled sequencer + for _ in 0..max_l2_blocks_per_commitment { + readonly_sequencer + .client + .send_publish_batch_request() + .await?; + } + + // Expect sequencer to send commitment + da.wait_mempool_len(2, None).await?; + da.generate(DEFAULT_FINALITY_DEPTH).await?; + + // Check that full node can fetch the new commitment + let new_commitment = readonly_sequencer + .client + .http_client() + .get_sequencer_commitment_by_index(U32::from(1)) + .await?; + + assert!(new_commitment.is_some()); + + Ok(()) + } +} + +#[tokio::test] +async fn read_only_sequencer_rpc_toggle_test() -> Result<()> { + TestCaseRunner::new(ReadOnlySequencerRpcToggleTest) + .set_citrea_path(get_citrea_path()) + .run() + .await +} + /// Test listen mode sequencer with sync_blocks_count = 0 (subscription only) struct SubscriptionOnlyTest; From 7a0e8703242a1274be7dd05ba4901ec8e5455788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 14:10:55 +0300 Subject: [PATCH 6/8] Lint fmt --- bin/citrea/src/rollup/mod.rs | 3 +-- bin/citrea/tests/bitcoin/listen_mode_sequencer.rs | 4 +++- crates/sequencer/src/listen_mode/mod.rs | 6 +++--- crates/sequencer/src/rpc.rs | 4 +++- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index 0c00862ec6..60c051f4a5 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -456,8 +456,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { if ledger_db.get_head_l2_block()?.is_some() { // At least one l2 block was processed: derive params from the persisted state. - let init_params = - citrea_common::read_init_params_from_db(ledger_db, storage_manager)?; + let init_params = citrea_common::read_init_params_from_db(ledger_db, storage_manager)?; info!( "Initialize node. State root: 0x{}. Last l2 block hash: 0x{}.", hex::encode(init_params.prev_state_root), diff --git a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs index 1c7f54732c..3191cc0041 100644 --- a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs +++ b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs @@ -840,7 +840,9 @@ impl TestCase for ReadOnlySequencerRpcToggleTest { // producing instead of as a follower. let mut producer_config = readonly_sequencer.config.clone(); producer_config.node.listen_mode_config = None; - readonly_sequencer.start(Some(producer_config), None).await?; + readonly_sequencer + .start(Some(producer_config), None) + .await?; sleep(std::time::Duration::from_secs(2)).await; full_node.start(None, None).await?; sleep(std::time::Duration::from_secs(2)).await; diff --git a/crates/sequencer/src/listen_mode/mod.rs b/crates/sequencer/src/listen_mode/mod.rs index d5cc13ce0d..158b422b3c 100644 --- a/crates/sequencer/src/listen_mode/mod.rs +++ b/crates/sequencer/src/listen_mode/mod.rs @@ -46,14 +46,13 @@ use tokio::sync::broadcast; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tracing::{info, warn}; +use super::metrics::SEQUENCER_METRICS as SM; use crate::db_provider::DbProvider; use crate::deposit_data_mempool::DepositDataMempool; use crate::mempool::CitreaMempool; use crate::types::SequencerRpcMessage; use crate::CitreaSequencer; -use super::metrics::SEQUENCER_METRICS as SM; - /// Maximum time to wait for the listen-mode syncers to stop before a listen->producer conversion /// proceeds anyway. Bounds the conversion so a misbehaving syncer can never hang it indefinitely. const SYNCER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30); @@ -140,7 +139,8 @@ fn build_producer(parts: ProducerParts) -> anyhow::Result>::new(); + let native_stf = + StfBlueprint::>::new(); let init_params = read_init_params_from_db(&parts.ledger_db, &parts.storage_manager)?; CitreaSequencer::new( diff --git a/crates/sequencer/src/rpc.rs b/crates/sequencer/src/rpc.rs index 110c7847b3..23d7741d52 100644 --- a/crates/sequencer/src/rpc.rs +++ b/crates/sequencer/src/rpc.rs @@ -88,7 +88,9 @@ async fn main_sequencer_reachable(url: &str) -> bool { .await { Ok(_) => return true, - Err(e) => debug!("Main sequencer probe {attempt}/{MAIN_SEQUENCER_PROBE_ATTEMPTS} failed: {e}"), + Err(e) => { + debug!("Main sequencer probe {attempt}/{MAIN_SEQUENCER_PROBE_ATTEMPTS} failed: {e}") + } } if attempt < MAIN_SEQUENCER_PROBE_ATTEMPTS { tokio::time::sleep(MAIN_SEQUENCER_PROBE_INTERVAL).await; From e07be4a99e517da2502255426ceaf2947c1940b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 14:38:05 +0300 Subject: [PATCH 7/8] Test rpc idempotency and split brain guards --- .../tests/bitcoin/listen_mode_sequencer.rs | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs index 3191cc0041..3b6b693cde 100644 --- a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs +++ b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs @@ -634,6 +634,33 @@ impl TestCase for ReadOnlySequencerRpcToggleTest { // Expect sequencer to send commitment da.wait_mempool_len(2, None).await?; + // Edge case: converting a sequencer that is already a producer (not in listen mode) must be + // rejected. + let err = sequencer + .client + .http_client() + .convert_to_producer() + .await + .expect_err("converting a producer sequencer should fail"); + assert!( + err.to_string().to_lowercase().contains("listen mode"), + "expected a 'not in listen mode' error, got: {err}" + ); + + // Edge case: converting the read-only sequencer while the main sequencer is still reachable + // must be rejected (split-brain guard). It must also leave the node convertible later, which + // the real conversion below verifies. + let err = readonly_sequencer + .client + .http_client() + .convert_to_producer() + .await + .expect_err("converting while the main sequencer is alive should fail"); + assert!( + err.to_string().to_lowercase().contains("reachable"), + "expected a 'main sequencer still reachable' error, got: {err}" + ); + // While the commitment is still in mempool, shut down the main sequencer and full node, then // toggle the read-only sequencer to producer in-process via RPC (no restart). sequencer.wait_until_stopped().await?; @@ -652,6 +679,19 @@ impl TestCase for ReadOnlySequencerRpcToggleTest { sleep(std::time::Duration::from_secs(2)).await; + // Edge case: calling convert again on the now-producer node is rejected by the idempotency + // guard rather than starting a second conversion. + let err = readonly_sequencer + .client + .http_client() + .convert_to_producer() + .await + .expect_err("a second conversion should fail"); + assert!( + err.to_string().to_lowercase().contains("already in progress"), + "expected an 'already in progress' error, got: {err}" + ); + let readonly_sequencer_test_client = make_test_client(SocketAddr::new( readonly_sequencer.config.rollup.rpc.bind_host.parse()?, readonly_sequencer.config.rollup.rpc.bind_port, From 1cb88914ae57e57af5e78bb2704880e31510220a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erce=20Can=20Bekt=C3=BCre?= Date: Fri, 5 Jun 2026 14:43:52 +0300 Subject: [PATCH 8/8] Lint --- bin/citrea/tests/bitcoin/listen_mode_sequencer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs index 3b6b693cde..acfa3ddb5e 100644 --- a/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs +++ b/bin/citrea/tests/bitcoin/listen_mode_sequencer.rs @@ -688,7 +688,9 @@ impl TestCase for ReadOnlySequencerRpcToggleTest { .await .expect_err("a second conversion should fail"); assert!( - err.to_string().to_lowercase().contains("already in progress"), + err.to_string() + .to_lowercase() + .contains("already in progress"), "expected an 'already in progress' error, got: {err}" );