Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions crates/asm/worker/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Service framework integration for ASM.

use std::marker;
use std::{marker, thread::sleep, time::Duration};

use bitcoin::hashes::Hash;
use serde::{Deserialize, Serialize};
Expand All @@ -10,7 +10,7 @@ use strata_service::{Response, Service, SyncService};
use strata_state::asm_state::AsmState;
use tracing::*;

use crate::{AsmWorkerServiceState, traits::WorkerContext};
use crate::{AsmWorkerServiceState, WorkerError, WorkerResult, traits::WorkerContext};

/// ASM service implementation using the service framework.
#[derive(Debug)]
Expand Down Expand Up @@ -65,7 +65,7 @@ impl<W: WorkerContext + Send + Sync + 'static> SyncService for AsmWorkerService<
let mut pivot_anchor = ctx.get_anchor_state(&pivot_block);

while pivot_anchor.is_err() && pivot_block.height() >= genesis_height {
let block = ctx.get_l1_block(pivot_block.blkid())?;
let block = get_l1_block_with_retry(ctx, pivot_block.blkid())?;
let parent_height = pivot_block.height() - 1;
let parent_block_id =
L1BlockCommitment::new(parent_height, block.header.prev_blockhash.to_l1_block_id());
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<W: WorkerContext + Send + Sync + 'static> SyncService for AsmWorkerService<
);
let _genesis_guard = genesis_span.enter();
// Fetch the genesis block (should work now since L1 reader processed it)
let genesis_block = ctx.get_l1_block(pivot_block.blkid())?;
let genesis_block = get_l1_block_with_retry(ctx, pivot_block.blkid())?;

// Compute wtxids_root and create manifest
let wtxids_root: strata_primitives::Buf32 = genesis_block
Expand Down Expand Up @@ -183,6 +183,40 @@ impl<W: WorkerContext + Send + Sync + 'static> SyncService for AsmWorkerService<
}
}

/// Fetches an L1 block, retrying on transient [`WorkerError::MissingL1Block`] errors.
///
/// The L1 reader may notify the ASM worker before the block data is fully
/// available (e.g. canonical-chain DB write hasn't propagated yet, or the
/// Bitcoin RPC times out under load). This bridges the gap with exponential
/// backoff: 200 ms base, 1.5x growth, 2 s cap, 10 retries (~10 s total).
fn get_l1_block_with_retry<W: WorkerContext>(
ctx: &W,
blockid: &L1BlockId,
) -> WorkerResult<bitcoin::Block> {
const MAX_RETRIES: u32 = 10;
const BASE_DELAY_MS: u64 = 200;
const MAX_DELAY_MS: u64 = 2000;
Comment thread
voidash marked this conversation as resolved.

let mut delay_ms = BASE_DELAY_MS;
for attempt in 0..=MAX_RETRIES {
match ctx.get_l1_block(blockid) {
Ok(block) => return Ok(block),
Err(WorkerError::MissingL1Block(id)) if attempt < MAX_RETRIES => {
warn!(
attempt,
?id,
delay_ms,
"L1 block not yet available, retrying"
);
sleep(Duration::from_millis(delay_ms));
delay_ms = (delay_ms * 3 / 2).min(MAX_DELAY_MS);
}
Err(e) => return Err(e),
}
}
unreachable!()
Comment thread
prajwolrg marked this conversation as resolved.
}

/// Status information for the ASM worker service.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AsmWorkerStatus {
Expand Down
37 changes: 37 additions & 0 deletions functional-tests-new/common/services/strata.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,43 @@ def wait_for_additional_blocks(
)
return self.get_cur_block_height(rpc)

def wait_for_l1_commitment_at(
self,
height: int,
rpc: JsonRpcClient | None = None,
timeout: int = 60,
poll_interval: float = 0.5,
differs_from: object | None = None,
) -> object:
"""Wait for an L1 header commitment at a given height.

Args:
height: L1 block height to check.
rpc: Optional RPC client. If None, creates a new one.
timeout: Maximum time to wait in seconds.
poll_interval: How often to poll.
differs_from: If set, also require the commitment to differ
from this value (useful for reorg detection).

Returns:
The L1 header commitment value.
"""
if rpc is None:
rpc = self.create_rpc()

def predicate(v):
if v is None:
return False
return differs_from is None or v != differs_from

return wait_until_with_value(
lambda: rpc.strata_getL1HeaderCommitment(height),
predicate,
error_with=f"No L1 header commitment at height {height}",
timeout=timeout,
step=poll_interval,
)

def check_block_generation_in_range(self, rpc: JsonRpcClient, start: int, end: int) -> int:
"""Checks for range of blocks produced and returns current block height"""
logger.info(f"Waiting for blocks from {start} to {end} be produced...")
Expand Down
6 changes: 5 additions & 1 deletion functional-tests-new/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def disabled_tests() -> frozenset[str]:
Can be extended via DISABLED_TESTS env var (comma-separated).
"""
base_disabled = frozenset(
["keepalive_stub_test", "revert_ol_state_fn", "revert_checkpointed_block_fn"]
[
"keepalive_stub_test",
"revert_ol_state_fn",
"revert_checkpointed_block_fn",
]
)

env_disabled = os.getenv("DISABLED_TESTS", "")
Expand Down
Empty file.
46 changes: 46 additions & 0 deletions functional-tests-new/tests/btcio/test_l1_connected.py
Comment thread
voidash marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Test that strata is connected to Bitcoin and tracking L1 blocks."""

import logging

import flexitest

from common.base_test import StrataNodeTest
from common.config import ServiceType

logger = logging.getLogger(__name__)


@flexitest.register
class TestL1Connected(StrataNodeTest):
"""Verify strata can see L1 blocks.

The basic env pre-generates 110 Bitcoin blocks before starting strata.
After strata starts, it should have L1 header commitments for those
blocks. We check that the genesis L1 height has a commitment, which
proves the L1 reader is connected and processing blocks.

Replaces old: btcio_connect.py (strata_l1connected)
"""

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env("basic")

def main(self, ctx):
strata = self.get_service(ServiceType.Strata)
bitcoin = self.get_service(ServiceType.Bitcoin)

rpc = strata.wait_for_rpc_ready(timeout=30)
btc_rpc = bitcoin.create_rpc()

# The basic env pre-generates 110 blocks. The genesis L1 height
# equals the Bitcoin tip at the time strata started (~110).
# The ASM only creates manifests for heights >= genesis, so we
# check the genesis height itself.
chain_info = btc_rpc.proxy.getblockchaininfo()
tip_height = chain_info["blocks"]
logger.info(f"Bitcoin tip (genesis L1 height): {tip_height}")

commitment = strata.wait_for_l1_commitment_at(tip_height, rpc=rpc, timeout=60)

logger.info(f"L1 header commitment at {tip_height}: {commitment}")
return True
104 changes: 104 additions & 0 deletions functional-tests-new/tests/btcio/test_l1_reorg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Test that strata handles Bitcoin L1 chain reorganizations."""

import logging

import flexitest

from common.base_test import StrataNodeTest
from common.config import ServiceType
from envconfigs.strata import StrataEnvConfig

logger = logging.getLogger(__name__)

# How many blocks above genesis to mine before triggering reorg.
EXTRA_BLOCKS = 6
# How many of those extra blocks to invalidate.
REORG_DEPTH = 3


@flexitest.register
class TestL1Reorg(StrataNodeTest):
"""Verify strata detects and handles L1 block reorganizations.

Mines blocks above genesis so the ASM has manifests to compare,
then invalidates some of those blocks, mines replacements, and
checks that strata updates its L1 header commitments.

Replaces old: btcio_read_reorg.py (L1ReadReorgTest)
"""

def __init__(self, ctx: flexitest.InitContext):
# standalone env: this test mutates the bitcoin chain via invalidateblock
ctx.set_env(StrataEnvConfig(pre_generate_blocks=110))

def main(self, ctx):
strata = self.get_service(ServiceType.Strata)
bitcoin = self.get_service(ServiceType.Bitcoin)

rpc = strata.wait_for_rpc_ready(timeout=30)
btc_rpc = bitcoin.create_rpc()

# Genesis L1 height = current bitcoin tip (set during env init).
# The ASM only creates manifests for heights >= genesis, so we must
# mine additional blocks and reorg within *those*, not below genesis.
genesis_tip = btc_rpc.proxy.getblockchaininfo()["blocks"]
logger.info(f"Genesis L1 tip: {genesis_tip}")

# Mine blocks above genesis one at a time so the ASM processes each
# before the next arrives (avoids L1 reader / ASM notification race).
addr = btc_rpc.proxy.getnewaddress()
for _ in range(EXTRA_BLOCKS):
btc_rpc.proxy.generatetoaddress(1, addr)
tip_height = btc_rpc.proxy.getblockchaininfo()["blocks"]
logger.info(f"Bitcoin tip after extra mining: {tip_height}")

# Pick a height to invalidate — must be above genesis.
invalidate_height = tip_height - REORG_DEPTH
assert invalidate_height > genesis_tip, (
f"invalidate_height {invalidate_height} must be above genesis {genesis_tip}"
)
logger.info(f"Will invalidate from height {invalidate_height}")

# Wait for strata to have processed the block at this height.
pre_reorg_commitment = strata.wait_for_l1_commitment_at(
invalidate_height, rpc=rpc, timeout=120
)
logger.info(f"Pre-reorg commitment at {invalidate_height}: {pre_reorg_commitment}")

# Invalidate the block (and all descendants).
block_hash = btc_rpc.proxy.getblockhash(invalidate_height)
logger.info(f"Invalidating block {block_hash}")
btc_rpc.proxy.invalidateblock(block_hash)

# Sanity check: bitcoin tip should have regressed.
regressed_tip = btc_rpc.proxy.getblockchaininfo()["blocks"]
if regressed_tip >= invalidate_height:
raise AssertionError(
f"Expected tip below {invalidate_height} after invalidation, got {regressed_tip}"
)
logger.info(f"Bitcoin tip regressed to {regressed_tip}")

# Mine replacement blocks past the old invalidation point one at a time.
# Use a fresh address to avoid duplicate-invalid coinbase collisions.
reorg_addr = btc_rpc.proxy.getnewaddress()
blocks_to_mine = REORG_DEPTH + 2
for _ in range(blocks_to_mine):
btc_rpc.proxy.generatetoaddress(1, reorg_addr)
post_tip = btc_rpc.proxy.getblockchaininfo()["blocks"]
logger.info(f"Post-reorg Bitcoin tip: {post_tip}")

# Wait for strata to pick up the new chain; the commitment
# at invalidate_height must differ from the pre-reorg value.
post_reorg_commitment = strata.wait_for_l1_commitment_at(
invalidate_height,
rpc=rpc,
timeout=120,
differs_from=pre_reorg_commitment,
)
logger.info(f"Post-reorg commitment at {invalidate_height}: {post_reorg_commitment}")

logger.info(
"Strata detected L1 reorg: commitment changed at height %d",
invalidate_height,
)
return True
74 changes: 74 additions & 0 deletions functional-tests-new/tests/btcio/test_l1_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Test that strata tracks new L1 blocks as they are mined."""

import logging

import flexitest

from common.base_test import StrataNodeTest
from common.config import ServiceType
from envconfigs.strata import StrataEnvConfig

logger = logging.getLogger(__name__)


@flexitest.register
class TestL1Tracking(StrataNodeTest):
"""Verify strata's L1 reader picks up newly mined Bitcoin blocks.

Mines additional Bitcoin blocks after strata is running and verifies
that strata_getL1HeaderCommitment returns data for the new heights.

Uses a standalone env to avoid interference from other tests that
may restart the sequencer.

Replaces old: btcio_read.py (strata_l1status)
"""

EXTRA_BLOCKS = 5

def __init__(self, ctx: flexitest.InitContext):
ctx.set_env(StrataEnvConfig(pre_generate_blocks=110))

def main(self, ctx):
strata = self.get_service(ServiceType.Strata)
bitcoin = self.get_service(ServiceType.Bitcoin)

rpc = strata.wait_for_rpc_ready(timeout=30)
btc_rpc = bitcoin.create_rpc()

# Record the current Bitcoin tip
pre_tip = btc_rpc.proxy.getblockchaininfo()["blocks"]
logger.info(f"Bitcoin tip before mining: {pre_tip}")

# Wait for strata to have caught up to pre_tip
strata.wait_for_l1_commitment_at(pre_tip, rpc=rpc, timeout=120)

# Mine additional blocks one at a time. Mining in a single batch can
# trigger a race in the L1 reader / ASM pipeline where the ASM is
# notified about a block before the full block data is persisted.
addr = btc_rpc.proxy.getnewaddress()
for _ in range(self.EXTRA_BLOCKS):
btc_rpc.proxy.generatetoaddress(1, addr)
post_tip = btc_rpc.proxy.getblockchaininfo()["blocks"]
logger.info(f"Bitcoin tip after mining {self.EXTRA_BLOCKS} blocks: {post_tip}")

if post_tip != pre_tip + self.EXTRA_BLOCKS:
raise AssertionError(f"Expected tip {pre_tip + self.EXTRA_BLOCKS}, got {post_tip}")

# Wait for strata to pick up the new blocks
commitment = strata.wait_for_l1_commitment_at(post_tip, rpc=rpc, timeout=120)
logger.info(f"L1 header commitment at new tip {post_tip}: {commitment}")

# Verify intermediate heights also have commitments.
# The tip is already confirmed, so intermediate heights should be
# available immediately — use a short timeout.
for h in range(pre_tip + 1, post_tip + 1):
strata.wait_for_l1_commitment_at(h, rpc=rpc, timeout=5)

logger.info(
"Strata tracked all %d new L1 blocks (%d -> %d)",
self.EXTRA_BLOCKS,
pre_tip,
post_tip,
)
return True
1 change: 0 additions & 1 deletion functional-tests/envs/testenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ def init(self, ctx: flexitest.EnvContext) -> flexitest.LiveEnv:
return BasicLiveEnv(svcs, bridge_pk, rollup_cfg, params)



# TODO: Maybe, we need to make it dynamic to enhance any EnvConfig with load testing capabilities.
class LoadEnvConfig(BasicEnvConfig):
_load_cfgs: list[LoadConfigBuilder] = []
Expand Down
Loading