diff --git a/crates/asm/worker/src/service.rs b/crates/asm/worker/src/service.rs index 8725c3330d..ea19c16a6e 100644 --- a/crates/asm/worker/src/service.rs +++ b/crates/asm/worker/src/service.rs @@ -1,6 +1,6 @@ //! Service framework integration for ASM. -use std::marker; +use std::{marker, thread::sleep, time::Duration}; use bitcoin::hashes::Hash; use serde::{Deserialize, Serialize}; @@ -10,7 +10,7 @@ use strata_service::{Response, Service, SyncService}; use strata_state::asm_state::AsmState; use tracing::*; -use crate::{AsmWorkerServiceState, traits::WorkerContext}; +use crate::{AsmWorkerServiceState, WorkerError, WorkerResult, traits::WorkerContext}; /// ASM service implementation using the service framework. #[derive(Debug)] @@ -65,7 +65,7 @@ impl SyncService for AsmWorkerService< let mut pivot_anchor = ctx.get_anchor_state(&pivot_block); while pivot_anchor.is_err() && pivot_block.height() >= genesis_height { - let block = ctx.get_l1_block(pivot_block.blkid())?; + let block = get_l1_block_with_retry(ctx, pivot_block.blkid())?; let parent_height = pivot_block.height() - 1; let parent_block_id = L1BlockCommitment::new(parent_height, block.header.prev_blockhash.to_l1_block_id()); @@ -107,7 +107,7 @@ impl SyncService for AsmWorkerService< ); let _genesis_guard = genesis_span.enter(); // Fetch the genesis block (should work now since L1 reader processed it) - let genesis_block = ctx.get_l1_block(pivot_block.blkid())?; + let genesis_block = get_l1_block_with_retry(ctx, pivot_block.blkid())?; // Compute wtxids_root and create manifest let wtxids_root: strata_primitives::Buf32 = genesis_block @@ -183,6 +183,40 @@ impl SyncService for AsmWorkerService< } } +/// Fetches an L1 block, retrying on transient [`WorkerError::MissingL1Block`] errors. +/// +/// The L1 reader may notify the ASM worker before the block data is fully +/// available (e.g. canonical-chain DB write hasn't propagated yet, or the +/// Bitcoin RPC times out under load). This bridges the gap with exponential +/// backoff: 200 ms base, 1.5x growth, 2 s cap, 10 retries (~10 s total). +fn get_l1_block_with_retry( + ctx: &W, + blockid: &L1BlockId, +) -> WorkerResult { + const MAX_RETRIES: u32 = 10; + const BASE_DELAY_MS: u64 = 200; + const MAX_DELAY_MS: u64 = 2000; + + let mut delay_ms = BASE_DELAY_MS; + for attempt in 0..=MAX_RETRIES { + match ctx.get_l1_block(blockid) { + Ok(block) => return Ok(block), + Err(WorkerError::MissingL1Block(id)) if attempt < MAX_RETRIES => { + warn!( + attempt, + ?id, + delay_ms, + "L1 block not yet available, retrying" + ); + sleep(Duration::from_millis(delay_ms)); + delay_ms = (delay_ms * 3 / 2).min(MAX_DELAY_MS); + } + Err(e) => return Err(e), + } + } + unreachable!() +} + /// Status information for the ASM worker service. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AsmWorkerStatus { diff --git a/functional-tests-new/common/services/strata.py b/functional-tests-new/common/services/strata.py index 1bc2ca072b..9940f8ac11 100644 --- a/functional-tests-new/common/services/strata.py +++ b/functional-tests-new/common/services/strata.py @@ -196,6 +196,43 @@ def wait_for_additional_blocks( ) return self.get_cur_block_height(rpc) + def wait_for_l1_commitment_at( + self, + height: int, + rpc: JsonRpcClient | None = None, + timeout: int = 60, + poll_interval: float = 0.5, + differs_from: object | None = None, + ) -> object: + """Wait for an L1 header commitment at a given height. + + Args: + height: L1 block height to check. + rpc: Optional RPC client. If None, creates a new one. + timeout: Maximum time to wait in seconds. + poll_interval: How often to poll. + differs_from: If set, also require the commitment to differ + from this value (useful for reorg detection). + + Returns: + The L1 header commitment value. + """ + if rpc is None: + rpc = self.create_rpc() + + def predicate(v): + if v is None: + return False + return differs_from is None or v != differs_from + + return wait_until_with_value( + lambda: rpc.strata_getL1HeaderCommitment(height), + predicate, + error_with=f"No L1 header commitment at height {height}", + timeout=timeout, + step=poll_interval, + ) + def check_block_generation_in_range(self, rpc: JsonRpcClient, start: int, end: int) -> int: """Checks for range of blocks produced and returns current block height""" logger.info(f"Waiting for blocks from {start} to {end} be produced...") diff --git a/functional-tests-new/entry.py b/functional-tests-new/entry.py index f8befdbe92..deb8588b8e 100755 --- a/functional-tests-new/entry.py +++ b/functional-tests-new/entry.py @@ -43,7 +43,11 @@ def disabled_tests() -> frozenset[str]: Can be extended via DISABLED_TESTS env var (comma-separated). """ base_disabled = frozenset( - ["keepalive_stub_test", "revert_ol_state_fn", "revert_checkpointed_block_fn"] + [ + "keepalive_stub_test", + "revert_ol_state_fn", + "revert_checkpointed_block_fn", + ] ) env_disabled = os.getenv("DISABLED_TESTS", "") diff --git a/functional-tests-new/tests/btcio/__init__.py b/functional-tests-new/tests/btcio/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/functional-tests-new/tests/btcio/test_l1_connected.py b/functional-tests-new/tests/btcio/test_l1_connected.py new file mode 100644 index 0000000000..5f52fc5359 --- /dev/null +++ b/functional-tests-new/tests/btcio/test_l1_connected.py @@ -0,0 +1,46 @@ +"""Test that strata is connected to Bitcoin and tracking L1 blocks.""" + +import logging + +import flexitest + +from common.base_test import StrataNodeTest +from common.config import ServiceType + +logger = logging.getLogger(__name__) + + +@flexitest.register +class TestL1Connected(StrataNodeTest): + """Verify strata can see L1 blocks. + + The basic env pre-generates 110 Bitcoin blocks before starting strata. + After strata starts, it should have L1 header commitments for those + blocks. We check that the genesis L1 height has a commitment, which + proves the L1 reader is connected and processing blocks. + + Replaces old: btcio_connect.py (strata_l1connected) + """ + + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env("basic") + + def main(self, ctx): + strata = self.get_service(ServiceType.Strata) + bitcoin = self.get_service(ServiceType.Bitcoin) + + rpc = strata.wait_for_rpc_ready(timeout=30) + btc_rpc = bitcoin.create_rpc() + + # The basic env pre-generates 110 blocks. The genesis L1 height + # equals the Bitcoin tip at the time strata started (~110). + # The ASM only creates manifests for heights >= genesis, so we + # check the genesis height itself. + chain_info = btc_rpc.proxy.getblockchaininfo() + tip_height = chain_info["blocks"] + logger.info(f"Bitcoin tip (genesis L1 height): {tip_height}") + + commitment = strata.wait_for_l1_commitment_at(tip_height, rpc=rpc, timeout=60) + + logger.info(f"L1 header commitment at {tip_height}: {commitment}") + return True diff --git a/functional-tests-new/tests/btcio/test_l1_reorg.py b/functional-tests-new/tests/btcio/test_l1_reorg.py new file mode 100644 index 0000000000..733939b9ca --- /dev/null +++ b/functional-tests-new/tests/btcio/test_l1_reorg.py @@ -0,0 +1,104 @@ +"""Test that strata handles Bitcoin L1 chain reorganizations.""" + +import logging + +import flexitest + +from common.base_test import StrataNodeTest +from common.config import ServiceType +from envconfigs.strata import StrataEnvConfig + +logger = logging.getLogger(__name__) + +# How many blocks above genesis to mine before triggering reorg. +EXTRA_BLOCKS = 6 +# How many of those extra blocks to invalidate. +REORG_DEPTH = 3 + + +@flexitest.register +class TestL1Reorg(StrataNodeTest): + """Verify strata detects and handles L1 block reorganizations. + + Mines blocks above genesis so the ASM has manifests to compare, + then invalidates some of those blocks, mines replacements, and + checks that strata updates its L1 header commitments. + + Replaces old: btcio_read_reorg.py (L1ReadReorgTest) + """ + + def __init__(self, ctx: flexitest.InitContext): + # standalone env: this test mutates the bitcoin chain via invalidateblock + ctx.set_env(StrataEnvConfig(pre_generate_blocks=110)) + + def main(self, ctx): + strata = self.get_service(ServiceType.Strata) + bitcoin = self.get_service(ServiceType.Bitcoin) + + rpc = strata.wait_for_rpc_ready(timeout=30) + btc_rpc = bitcoin.create_rpc() + + # Genesis L1 height = current bitcoin tip (set during env init). + # The ASM only creates manifests for heights >= genesis, so we must + # mine additional blocks and reorg within *those*, not below genesis. + genesis_tip = btc_rpc.proxy.getblockchaininfo()["blocks"] + logger.info(f"Genesis L1 tip: {genesis_tip}") + + # Mine blocks above genesis one at a time so the ASM processes each + # before the next arrives (avoids L1 reader / ASM notification race). + addr = btc_rpc.proxy.getnewaddress() + for _ in range(EXTRA_BLOCKS): + btc_rpc.proxy.generatetoaddress(1, addr) + tip_height = btc_rpc.proxy.getblockchaininfo()["blocks"] + logger.info(f"Bitcoin tip after extra mining: {tip_height}") + + # Pick a height to invalidate — must be above genesis. + invalidate_height = tip_height - REORG_DEPTH + assert invalidate_height > genesis_tip, ( + f"invalidate_height {invalidate_height} must be above genesis {genesis_tip}" + ) + logger.info(f"Will invalidate from height {invalidate_height}") + + # Wait for strata to have processed the block at this height. + pre_reorg_commitment = strata.wait_for_l1_commitment_at( + invalidate_height, rpc=rpc, timeout=120 + ) + logger.info(f"Pre-reorg commitment at {invalidate_height}: {pre_reorg_commitment}") + + # Invalidate the block (and all descendants). + block_hash = btc_rpc.proxy.getblockhash(invalidate_height) + logger.info(f"Invalidating block {block_hash}") + btc_rpc.proxy.invalidateblock(block_hash) + + # Sanity check: bitcoin tip should have regressed. + regressed_tip = btc_rpc.proxy.getblockchaininfo()["blocks"] + if regressed_tip >= invalidate_height: + raise AssertionError( + f"Expected tip below {invalidate_height} after invalidation, got {regressed_tip}" + ) + logger.info(f"Bitcoin tip regressed to {regressed_tip}") + + # Mine replacement blocks past the old invalidation point one at a time. + # Use a fresh address to avoid duplicate-invalid coinbase collisions. + reorg_addr = btc_rpc.proxy.getnewaddress() + blocks_to_mine = REORG_DEPTH + 2 + for _ in range(blocks_to_mine): + btc_rpc.proxy.generatetoaddress(1, reorg_addr) + post_tip = btc_rpc.proxy.getblockchaininfo()["blocks"] + logger.info(f"Post-reorg Bitcoin tip: {post_tip}") + + # Wait for strata to pick up the new chain; the commitment + # at invalidate_height must differ from the pre-reorg value. + post_reorg_commitment = strata.wait_for_l1_commitment_at( + invalidate_height, + rpc=rpc, + timeout=120, + differs_from=pre_reorg_commitment, + ) + logger.info(f"Post-reorg commitment at {invalidate_height}: {post_reorg_commitment}") + + logger.info( + "Strata detected L1 reorg: commitment changed at height %d", + invalidate_height, + ) + return True diff --git a/functional-tests-new/tests/btcio/test_l1_tracking.py b/functional-tests-new/tests/btcio/test_l1_tracking.py new file mode 100644 index 0000000000..5704337784 --- /dev/null +++ b/functional-tests-new/tests/btcio/test_l1_tracking.py @@ -0,0 +1,74 @@ +"""Test that strata tracks new L1 blocks as they are mined.""" + +import logging + +import flexitest + +from common.base_test import StrataNodeTest +from common.config import ServiceType +from envconfigs.strata import StrataEnvConfig + +logger = logging.getLogger(__name__) + + +@flexitest.register +class TestL1Tracking(StrataNodeTest): + """Verify strata's L1 reader picks up newly mined Bitcoin blocks. + + Mines additional Bitcoin blocks after strata is running and verifies + that strata_getL1HeaderCommitment returns data for the new heights. + + Uses a standalone env to avoid interference from other tests that + may restart the sequencer. + + Replaces old: btcio_read.py (strata_l1status) + """ + + EXTRA_BLOCKS = 5 + + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env(StrataEnvConfig(pre_generate_blocks=110)) + + def main(self, ctx): + strata = self.get_service(ServiceType.Strata) + bitcoin = self.get_service(ServiceType.Bitcoin) + + rpc = strata.wait_for_rpc_ready(timeout=30) + btc_rpc = bitcoin.create_rpc() + + # Record the current Bitcoin tip + pre_tip = btc_rpc.proxy.getblockchaininfo()["blocks"] + logger.info(f"Bitcoin tip before mining: {pre_tip}") + + # Wait for strata to have caught up to pre_tip + strata.wait_for_l1_commitment_at(pre_tip, rpc=rpc, timeout=120) + + # Mine additional blocks one at a time. Mining in a single batch can + # trigger a race in the L1 reader / ASM pipeline where the ASM is + # notified about a block before the full block data is persisted. + addr = btc_rpc.proxy.getnewaddress() + for _ in range(self.EXTRA_BLOCKS): + btc_rpc.proxy.generatetoaddress(1, addr) + post_tip = btc_rpc.proxy.getblockchaininfo()["blocks"] + logger.info(f"Bitcoin tip after mining {self.EXTRA_BLOCKS} blocks: {post_tip}") + + if post_tip != pre_tip + self.EXTRA_BLOCKS: + raise AssertionError(f"Expected tip {pre_tip + self.EXTRA_BLOCKS}, got {post_tip}") + + # Wait for strata to pick up the new blocks + commitment = strata.wait_for_l1_commitment_at(post_tip, rpc=rpc, timeout=120) + logger.info(f"L1 header commitment at new tip {post_tip}: {commitment}") + + # Verify intermediate heights also have commitments. + # The tip is already confirmed, so intermediate heights should be + # available immediately — use a short timeout. + for h in range(pre_tip + 1, post_tip + 1): + strata.wait_for_l1_commitment_at(h, rpc=rpc, timeout=5) + + logger.info( + "Strata tracked all %d new L1 blocks (%d -> %d)", + self.EXTRA_BLOCKS, + pre_tip, + post_tip, + ) + return True diff --git a/functional-tests/envs/testenv.py b/functional-tests/envs/testenv.py index 87a895ff4d..2ecb6955f9 100644 --- a/functional-tests/envs/testenv.py +++ b/functional-tests/envs/testenv.py @@ -514,7 +514,6 @@ def init(self, ctx: flexitest.EnvContext) -> flexitest.LiveEnv: return BasicLiveEnv(svcs, bridge_pk, rollup_cfg, params) - # TODO: Maybe, we need to make it dynamic to enhance any EnvConfig with load testing capabilities. class LoadEnvConfig(BasicEnvConfig): _load_cfgs: list[LoadConfigBuilder] = []