Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3b91b6c
implement accumulated interop roots transaction
Artemka374 Jan 26, 2026
034d422
add some docstrings
Artemka374 Jan 26, 2026
9fc954c
fix clippy
Artemka374 Jan 26, 2026
8b51e5b
fmt
Artemka374 Jan 26, 2026
9d91fd6
a few more docs
Artemka374 Jan 26, 2026
ca200fa
rename transactions to roots
Artemka374 Jan 26, 2026
75ddf7d
implement separate stream for interop transactions
Artemka374 Jan 27, 2026
47efa94
Merge branch 'main' into afo/accumulated-interop-txs
Artemka374 Jan 27, 2026
4944e47
some cleanup
Artemka374 Jan 27, 2026
2fb1da8
fix a few things
Artemka374 Jan 27, 2026
baac86e
remove block output ext
Artemka374 Jan 27, 2026
498d414
Merge branch 'main' into afo/accumulated-interop-txs
Artemka374 Jan 28, 2026
61839e3
use log id instead of log index in block
Artemka374 Jan 28, 2026
f15ea32
add copy-pasted binary search
Artemka374 Jan 28, 2026
c7996a3
address comments
Artemka374 Jan 29, 2026
c608bcd
Merge branch 'main' into afo/accumulated-interop-txs
Artemka374 Jan 29, 2026
e48cb58
fix stream
Artemka374 Jan 29, 2026
7af8030
Merge branch 'afo/accumulated-interop-txs' into afo/use-interop-log-id
Artemka374 Jan 29, 2026
527175c
resolve conflicts
Artemka374 Jan 29, 2026
f4d6a55
Merge branch 'main' into afo/accumulated-interop-txs
Artemka374 Jan 29, 2026
f7588e1
fix clippy
Artemka374 Jan 29, 2026
59c921b
fix: Interop roots flooding (#845)
Artemka374 Jan 30, 2026
9034115
send 1 root per tx
Artemka374 Jan 30, 2026
1ad049c
separate pool and storage
Artemka374 Jan 30, 2026
8844d35
fix build/use iterator instead of stream
Artemka374 Jan 30, 2026
1c185ab
fix wrong status
Artemka374 Jan 30, 2026
9ff7929
Merge branch 'main' into afo/separated-interop-tx-pool-storage
Artemka374 Jan 30, 2026
4726470
fix a few things
Artemka374 Feb 2, 2026
c1fba76
a few more fixes
Artemka374 Feb 2, 2026
da31588
address comments
Artemka374 Feb 2, 2026
94fbc4c
Merge branch 'afo/separated-interop-tx-pool-storage' into afo/use-int…
Artemka374 Feb 2, 2026
1113ac2
make delay between service blocks a config value
Artemka374 Feb 2, 2026
a20c3ec
Merge branch 'afo/separated-interop-tx-pool-storage' into afo/use-int…
Artemka374 Feb 2, 2026
8ddac48
fix build
Artemka374 Feb 2, 2026
e6c6cde
remove v2 of network replay
Artemka374 Feb 2, 2026
731278e
Merge branch 'main' into afo/use-interop-log-id
Artemka374 Feb 3, 2026
d9ef734
Merge branch 'main' into afo/use-interop-log-id
Artemka374 Feb 3, 2026
d955ac1
resolve conflicts
Artemka374 Feb 3, 2026
c6cf0c7
polish watcher
Artemka374 Feb 4, 2026
cce1d6d
address comments, cut out 1 root per tx
Artemka374 Feb 9, 2026
89462f7
filter events to accept only the latest event with identical id
Artemka374 Feb 9, 2026
36bdd86
Merge branch 'main' into afo/use-interop-log-id
Artemka374 Feb 9, 2026
4e4893a
fix clippy
Artemka374 Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions lib/contract_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::IBridgehub::{
IBridgehubInstance, L2TransactionRequestDirect, L2TransactionRequestTwoBridgesOuter,
requestL2TransactionDirectCall, requestL2TransactionTwoBridgesCall,
};
use crate::IMessageRoot::IMessageRootInstance;
use crate::IMultisigCommitter::IMultisigCommitterInstance;
use crate::IZKChain::IZKChainInstance;
use alloy::contract::SolCallBuilder;
Expand Down Expand Up @@ -58,13 +59,9 @@ alloy::sol! {
// Event that is being emmited by L1
event AppendedChainRoot(uint256 indexed chainId, uint256 indexed batchNumber, bytes32 indexed chainRoot);

function addInteropRoot (
uint256 chainId,
uint256 blockOrBatchNumber,
bytes32[] calldata sides
);

function addInteropRootsInBatch(InteropRoot[] calldata interopRootsInput);

uint256 public totalPublishedInteropRoots;
}

// `ZKChainStorage.sol`
Expand Down Expand Up @@ -357,6 +354,46 @@ alloy::sol! {
}
}

pub struct MessageRoot<P: Provider> {
instance: IMessageRootInstance<P, Ethereum>,
address: Address,
}

impl<P: Provider> MessageRoot<P> {
pub fn new(address: Address, provider: P) -> Self {
let instance = IMessageRoot::new(address, provider);
Self { instance, address }
}

pub fn address(&self) -> &Address {
&self.address
}

pub fn provider(&self) -> &P {
self.instance.provider()
}

pub async fn total_published_interop_roots(&self, block_id: BlockId) -> Result<u64> {
self.instance
.totalPublishedInteropRoots()
.block(block_id)
.call()
.await
.map(|n| n.saturating_to())
.enrich("totalPublishedInteropRoots", Some(block_id))
}

pub async fn code_exists_at_block(&self, block_id: BlockId) -> alloy::contract::Result<bool> {
let code = self
.provider()
.get_code_at(*self.address())
.block_id(block_id)
.await?;

Ok(!code.0.is_empty())
}
}

#[derive(Clone, Debug)]
pub struct Bridgehub<P: Provider> {
instance: IBridgehubInstance<P, Ethereum>,
Expand Down
93 changes: 55 additions & 38 deletions lib/l1_watcher/src/interop_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,93 +1,110 @@
use alloy::rpc::types::Log;
use std::collections::HashMap;

use alloy::primitives::ruint::FromUintError;
use alloy::rpc::types::{Log, Topic, ValueOrArray};
use alloy::sol_types::SolEvent;
use alloy::{primitives::Address, providers::DynProvider};
use zksync_os_contract_interface::IMessageRoot::AppendedChainRoot;
use zksync_os_contract_interface::IMessageRoot::NewInteropRoot;
use zksync_os_contract_interface::{Bridgehub, InteropRoot};
use zksync_os_mempool::InteropTxPool;
use zksync_os_types::{IndexedInteropRoot, InteropRootsLogIndex};
use zksync_os_types::IndexedInteropRoot;

use crate::util::find_l1_block_by_interop_root_id;
use crate::watcher::{L1Watcher, L1WatcherError};
use crate::{L1WatcherConfig, ProcessL1Event};
use crate::{L1WatcherConfig, ProcessRawEvents};

pub struct InteropWatcher {
contract_address: Address,
starting_interop_event_index: InteropRootsLogIndex,
starting_interop_root_id: u64,
tx_pool: InteropTxPool,
}

impl InteropWatcher {
pub async fn create_watcher(
bridgehub: Bridgehub<DynProvider>,
config: L1WatcherConfig,
starting_interop_event_index: InteropRootsLogIndex,
starting_interop_root_id: u64,
tx_pool: InteropTxPool,
) -> anyhow::Result<L1Watcher> {
let contract_address = bridgehub.message_root_address().await?;

tracing::info!(
contract_address = ?contract_address,
starting_interop_event_index = ?starting_interop_event_index,
starting_interop_root_id = ?starting_interop_root_id,
"initializing interop watcher"
);

let this = Self {
contract_address,
starting_interop_event_index,
starting_interop_root_id,
tx_pool,
};

let l1_watcher = L1Watcher::new(
let next_l1_block =
find_l1_block_by_interop_root_id(bridgehub.clone(), starting_interop_root_id).await?;

Ok(L1Watcher::new(
bridgehub.provider().clone(),
this.starting_interop_event_index.block_number,
next_l1_block,
config.max_blocks_to_process,
config.poll_interval,
this.into(),
);

Ok(l1_watcher)
Box::new(this),
))
}
}

#[async_trait::async_trait]
impl ProcessL1Event for InteropWatcher {
const NAME: &'static str = "interop_root";
impl ProcessRawEvents for InteropWatcher {
fn name(&self) -> &'static str {
"interop_root"
}

type SolEvent = AppendedChainRoot;
type WatchedEvent = AppendedChainRoot;
fn event_signatures(&self) -> Topic {
NewInteropRoot::SIGNATURE_HASH.into()
}

fn contract_address(&self) -> Address {
self.contract_address
fn contract_addresses(&self) -> ValueOrArray<Address> {
self.contract_address.into()
}

async fn process_event(
&mut self,
tx: AppendedChainRoot,
log: Log,
) -> Result<(), L1WatcherError> {
let current_log_index = InteropRootsLogIndex {
block_number: log.block_number.expect("Block number is required"),
index_in_block: log.log_index.expect("Log index is required"),
};
fn filter_events(&self, logs: Vec<Log>) -> Vec<Log> {
// we want to accept only the latest event for each log id
let mut indexes = HashMap::new();

if current_log_index < self.starting_interop_event_index {
for log in logs {
let sol_event = NewInteropRoot::decode_log(&log.inner)
.expect("failed to decode log")
.data;
indexes.insert(sol_event.logId, log);
}

indexes.into_values().collect()
}

async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let event = NewInteropRoot::decode_log(&log.inner)?.data;

if event.logId < self.starting_interop_root_id {
tracing::debug!(
current_log_index = ?current_log_index,
starting_interop_event_index = ?self.starting_interop_event_index,
log_id = ?event.logId,
starting_interop_root_id = self.starting_interop_root_id,
"skipping interop root event before starting index",
);
return Ok(());
}

let interop_root = InteropRoot {
chainId: tx.chainId,
blockOrBatchNumber: tx.batchNumber,
sides: vec![tx.chainRoot],
chainId: event.chainId,
blockOrBatchNumber: event.blockNumber,
sides: event.sides.clone(),
};

self.tx_pool.add_root(IndexedInteropRoot {
log_index: current_log_index,
log_id: event
.logId
.try_into()
.map_err(|e: FromUintError<u64>| L1WatcherError::Other(e.into()))?,
root: interop_root,
});

Ok(())
}
}
6 changes: 6 additions & 0 deletions lib/l1_watcher/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub trait ProcessRawEvents: Send + Sync + 'static {
/// See [`alloy::rpc::types::Filter`] documentation for more details.
fn contract_addresses(&self) -> ValueOrArray<Address>;

fn filter_events(&self, logs: Vec<Log>) -> Vec<Log>;

/// Invoked each time a new log matching the filter is found.
async fn process_raw_event(&mut self, event: Log) -> Result<(), L1WatcherError>;
}
Expand All @@ -51,6 +53,10 @@ where
self.contract_address().into()
}

fn filter_events(&self, logs: Vec<Log>) -> Vec<Log> {
logs
}

async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let sol_event = T::SolEvent::decode_log(&log.inner)?.data;
let watched_event =
Expand Down
48 changes: 47 additions & 1 deletion lib/l1_watcher/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use zksync_os_batch_types::{BatchInfo, DiscoveredCommittedBatch};
use zksync_os_contract_interface::IExecutor::ReportCommittedBatchRangeZKsyncOS;
use zksync_os_contract_interface::calldata::CommitCalldata;
use zksync_os_contract_interface::models::CommitBatchInfo;
use zksync_os_contract_interface::{IExecutor, ZkChain};
use zksync_os_contract_interface::{Bridgehub, IExecutor, MessageRoot, ZkChain};
use zksync_os_types::ProtocolSemanticVersion;

pub const ANVIL_L1_CHAIN_ID: u64 = 31337;
Expand Down Expand Up @@ -273,6 +273,52 @@ pub async fn find_l1_execute_block_by_batch_number(
.await
}

pub async fn find_l1_block_by_interop_root_id(
bridgehub: Bridgehub<DynProvider>,
next_interop_root_id: u64,
) -> anyhow::Result<BlockNumber> {
let message_root_address = bridgehub.message_root_address().await?;
let message_root = Arc::new(MessageRoot::new(
message_root_address,
bridgehub.provider().clone(),
));

let latest = message_root.provider().get_block_number().await?;

let guarded_predicate =
async |message_root: Arc<MessageRoot<DynProvider>>, block: u64| -> anyhow::Result<bool> {
if !message_root.code_exists_at_block(block.into()).await? {
// return early if contract is not deployed yet - otherwise `predicate` might fail
return Ok(false);
}
let res = message_root
.total_published_interop_roots(block.into())
.await?;
Ok(res >= next_interop_root_id)
};

// Ensure the predicate is true by the upper bound, or bail early.
if !guarded_predicate(message_root.clone(), latest).await? {
anyhow::bail!(
"Condition not satisfied up to latest block: contract not deployed yet \
or target not reached.",
);
}

// Binary search on [0, latest] for the first block where predicate is true.
let (mut lo, mut hi) = (0, latest);
while lo < hi {
let mid = (lo + hi) / 2;
if guarded_predicate(message_root.clone(), mid).await? {
hi = mid;
} else {
lo = mid + 1;
}
}

Ok(lo)
}

/// Fetches and decodes stored batch data for batch `batch_number` that is expected to have been
/// committed in `l1_block_number`. Returns `None` if requested batch has not been committed in
/// the given L1 block.
Expand Down
2 changes: 2 additions & 0 deletions lib/l1_watcher/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl L1Watcher {
let events = self
.extract_logs_from_l1_blocks(from_block, to_block)
.await?;
let events = self.processor.filter_events(events);

METRICS.events_loaded[&self.processor.name()].inc_by(events.len() as u64);
METRICS.most_recently_scanned_l1_block[&self.processor.name()].set(to_block);

Expand Down
26 changes: 6 additions & 20 deletions lib/mempool/src/interop_tx_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use tokio::{
time::{Sleep, sleep_until},
};
use tokio_stream::wrappers::BroadcastStream;
use zksync_os_types::{
IndexedInteropRoot, InteropRoot, InteropRootsEnvelope, InteropRootsLogIndex,
};
use zksync_os_types::{IndexedInteropRoot, InteropRoot, InteropRootsEnvelope};

#[derive(Clone)]
pub struct InteropTxPool {
Expand Down Expand Up @@ -45,10 +43,7 @@ impl InteropTxPool {
self.inner.write().unwrap().add_root(root);
}

pub fn on_canonical_state_change(
&mut self,
txs: Vec<InteropRootsEnvelope>,
) -> Option<InteropRootsLogIndex> {
pub fn on_canonical_state_change(&mut self, txs: Vec<InteropRootsEnvelope>) -> Option<u64> {
self.inner.write().unwrap().on_canonical_state_change(txs)
}
}
Expand Down Expand Up @@ -147,16 +142,8 @@ impl InteropTxPoolInner {

/// Cleans up the stream and removes all roots that were sent in transactions
/// Returns the last log index of executed interop root
pub fn on_canonical_state_change(
&mut self,
txs: Vec<InteropRootsEnvelope>,
) -> Option<InteropRootsLogIndex> {
if txs.is_empty() {
return None;
}

let mut log_index = InteropRootsLogIndex::default();

pub fn on_canonical_state_change(&mut self, txs: Vec<InteropRootsEnvelope>) -> Option<u64> {
let mut log_id = None;
for tx in txs {
let starting_index = self.pending_roots.len() - tx.interop_roots_count() as usize;

Expand All @@ -169,11 +156,10 @@ impl InteropTxPoolInner {
let envelope = InteropRootsEnvelope::from_interop_roots(
roots.iter().map(|r| r.root.clone()).collect(),
);
log_index = roots.last().unwrap().log_index.clone();
log_id = Some(roots.last().unwrap().log_id);

assert_eq!(&envelope, &tx);
}

Some(log_index)
log_id
}
}
Loading
Loading