Skip to content
10 changes: 8 additions & 2 deletions integration-tests/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ impl<'a> ChainLayout<'a> {
fn chain_id(self) -> Option<u64> {
match self {
ChainLayout::Default { .. } => None,
ChainLayout::MultiChain { chain_index, .. } => Some(6565u64 + chain_index as u64),
ChainLayout::MultiChain { chain_index, .. } => {
if chain_index == 0 {
Some(506u64)
} else {
Some(6565u64 - 1 + chain_index as u64)
}
}
}
}

Expand Down Expand Up @@ -135,7 +141,7 @@ fn load_config_from_path(config_path: &Path) -> Config {
Config {
genesis_config,
l1_sender_config: config_repo.single().unwrap().parse().unwrap(),
general_config: Default::default(),
general_config: config_repo.single().unwrap().parse().unwrap(),
network_config: Default::default(),
rpc_config: Default::default(),
mempool_config: Default::default(),
Expand Down
45 changes: 42 additions & 3 deletions integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use anyhow::Context;
use backon::ConstantBuilder;
use backon::Retryable;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::Path;
use std::process::Command;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
Expand Down Expand Up @@ -482,12 +484,12 @@ impl MultiChainTester {

/// Get chain A (first chain)
pub fn chain_a(&self) -> &Tester {
self.chain(0)
self.chain(1)
}

/// Get chain B (second chain)
pub fn chain_b(&self) -> &Tester {
self.chain(1)
self.chain(2)
}
}

Expand Down Expand Up @@ -516,7 +518,7 @@ impl MultiChainTesterBuilder {
.await?;

// Launch L2 chains using chain configurations from config files
let mut chains = Vec::new();
let mut chains: Vec<Tester> = Vec::new();
for i in 0..num_chains {
// Load the chain config to get the chain ID, operator keys, and contract addresses
let chain_config = load_chain_config(ChainLayout::MultiChain {
Expand All @@ -527,17 +529,54 @@ impl MultiChainTesterBuilder {
.genesis_config
.chain_id
.expect("Chain ID must be set in chain config");
let gateway_rpc_url = chain_config
.general_config
.gateway_rpc_url
.map(|_| chains[0].l2_rpc_address.clone());
let ephemeral_state = chain_config.general_config.ephemeral_state;
let l1_sender_config = chain_config.l1_sender_config.clone();
let bridgehub_address = chain_config.genesis_config.bridgehub_address;
let bytecode_supplier_address = chain_config.genesis_config.bytecode_supplier_address;

let chain_override = move |config: &mut Config| {
if gateway_rpc_url.is_some() {
config.general_config.gateway_rpc_url = gateway_rpc_url;
}
config.genesis_config.chain_id = Some(chain_id);
config.genesis_config.bridgehub_address = bridgehub_address;
config.genesis_config.bytecode_supplier_address = bytecode_supplier_address;
config.l1_sender_config = l1_sender_config.clone();
// Use short block time for faster tests
config.sequencer_config.block_time = Duration::from_millis(500);

if let Some(ephemeral_state) = &ephemeral_state {
let ephemeral_state = Path::new("..").join(ephemeral_state);
tracing::info!("Loading ephemeral state from {}", ephemeral_state.display());
#[cfg(target_os = "macos")]
let tar = "gtar";
#[cfg(not(target_os = "macos"))]
let tar = "tar";
let status = Command::new(tar)
.args([
"-xvf",
ephemeral_state.to_string_lossy().as_ref(),
&format!(
"--one-top-level={}",
config.general_config.rocks_db_path.to_string_lossy()
),
])
.status()
.expect(
"failed to call `tar` command; ensure it is present on your machine",
);
if !status.success() {
panic!(
"`tar` command failed to decompress ephemeral state from `{}` to `{}`",
ephemeral_state.display(),
config.general_config.rocks_db_path.display(),
);
}
}
};

let tester = Tester::launch_node(
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ async fn test_interop_bundle_send() -> Result<()> {
// This test validates the first part of the interop flow:
// setting up two chains and sending an interop bundle from chain A to chain B

let multi_chain = MultiChainTester::setup(2).await?;
let multi_chain = MultiChainTester::setup(3).await?;

let chain_a = multi_chain.chain_a();
let chain_b = multi_chain.chain_b();
Expand Down
16 changes: 14 additions & 2 deletions lib/batch_types/src/batch_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,24 @@ impl BatchInfo {
let (last_block_output, last_block_context, _, last_block_tree) = *blocks.last().unwrap();

let mut upgrade_tx_hash = None;

let mut dependency_roots_rolling_hash = B256::ZERO;

for (block_output, _, transactions, _) in blocks {
total_pubdata.extend(block_output.pubdata.clone());

for tx in transactions {
match tx.envelope() {
ZkEnvelope::System(_) | ZkEnvelope::L2(_) => {
ZkEnvelope::System(envelope) => {
number_of_layer2_txs += 1;

if let Some(roots) = envelope.interop_roots() {
for root in roots {
dependency_roots_rolling_hash = keccak256((dependency_roots_rolling_hash, root.chainId, root.blockOrBatchNumber, root.sides).abi_encode_packed());
}
}
}
ZkEnvelope::L2(_) => {
number_of_layer2_txs += 1;
}
ZkEnvelope::L1(l1_tx) => {
Expand Down Expand Up @@ -145,7 +157,7 @@ impl BatchInfo {
number_of_layer1_txs,
number_of_layer2_txs,
priority_operations_hash,
dependency_roots_rolling_hash: B256::ZERO,
dependency_roots_rolling_hash,
l2_to_l1_logs_root_hash,
l2_da_commitment_scheme: pubdata_mode.da_commitment_scheme(),
da_commitment: da_fields.da_commitment,
Expand Down
5 changes: 1 addition & 4 deletions lib/contract_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ alloy::sol! {

function addInteropRootsInBatch(InteropRoot[] calldata interopRootsInput);

// mapping(uint256 chainId => Bytes32PushTree tree) public chainTree;
// For some reason macro translates mapping to a function that returns uint256 instead of Bytes32PushTree.
// TODO: Worth opening an issue in alloy-rs.
function chainTree(uint256 chainId) public view returns (Bytes32PushTree);
function getChainTree(uint256 chainId) public view returns (Bytes32PushTree);

event AppendedChainBatchRoot(uint256 indexed chainId, uint256 indexed batchNumber, bytes32 chainBatchRoot);
function getMerklePathForChain(uint256 _chainId) external view returns (bytes32[] memory);
Expand Down
6 changes: 4 additions & 2 deletions lib/l1_sender/src/commands/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ use zksync_os_contract_interface::{IExecutor, InteropRoot};
pub struct ExecuteCommand {
batches: Vec<SignedBatchEnvelope<FriProof>>,
priority_ops: Vec<PriorityOpsBatchInfo>,
interop_roots: Vec<Vec<InteropRoot>>,
}

impl ExecuteCommand {
pub fn new(
batches: Vec<SignedBatchEnvelope<FriProof>>,
priority_ops: Vec<PriorityOpsBatchInfo>,
interop_roots: Vec<Vec<InteropRoot>>,
) -> Self {
assert_eq!(batches.len(), priority_ops.len());
Self {
batches,
priority_ops,
interop_roots,
}
}
}
Expand Down Expand Up @@ -95,8 +98,7 @@ impl ExecuteCommand {
.cloned()
.map(IExecutor::PriorityOpsBatchInfo::from)
.collect::<Vec<_>>();
// For now interop roots are empty.
let interop_roots: Vec<Vec<InteropRoot>> = vec![vec![]; self.batches.len()];
let interop_roots = self.interop_roots.clone();

let encoded_data: Vec<u8> = match self.batches.first().unwrap().batch.protocol_version.minor
{
Expand Down
76 changes: 45 additions & 31 deletions lib/l1_watcher/src/interop_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use alloy::rpc::types::Log;
use std::collections::HashMap;

use alloy::rpc::types::{Log, Topic, ValueOrArray};
use alloy::sol_types::SolEvent;
use alloy::{primitives::Address, providers::DynProvider};
use zksync_os_contract_interface::IMessageRoot::AppendedChainRoot;
use zksync_os_contract_interface::IMessageRoot::NewInteropRoot;
use zksync_os_contract_interface::{Bridgehub, InteropRoot};
use zksync_os_mempool::subpools::interop_roots::InteropRootsSubpool;
use zksync_os_types::{IndexedInteropRoot, InteropRootsLogIndex};

use crate::watcher::{L1Watcher, L1WatcherError};
use crate::{L1WatcherConfig, ProcessL1Event};
use crate::{L1WatcherConfig, ProcessRawEvents};

pub struct InteropWatcher {
contract_address: Address,
Expand Down Expand Up @@ -40,56 +43,67 @@ impl InteropWatcher {
this.starting_interop_event_index.block_number,
config.max_blocks_to_process,
config.poll_interval,
this.into(),
Box::new(this),
);

Ok(l1_watcher)
}
}

#[async_trait::async_trait]
impl ProcessL1Event for InteropWatcher {
const NAME: &'static str = "interop_root";
impl ProcessRawEvents for InteropWatcher {
fn name(&self) -> &'static str {
"interop_root"
}

type SolEvent = AppendedChainRoot;
type WatchedEvent = AppendedChainRoot;
fn event_signatures(&self) -> Topic {
NewInteropRoot::SIGNATURE_HASH.into()
}

fn contract_address(&self) -> Address {
self.contract_address
fn contract_addresses(&self) -> ValueOrArray<Address> {
self.contract_address.into()
}

async fn process_event(
&mut self,
tx: AppendedChainRoot,
log: Log,
) -> Result<(), L1WatcherError> {
let current_log_index = InteropRootsLogIndex {
block_number: log.block_number.expect("Block number is required"),
index_in_block: log.log_index.expect("Log index is required"),
fn filter_events(&self, logs: Vec<Log>) -> Vec<Log> {
// we want to accept only the latest event for each log id
let mut indexes = HashMap::new();

for log in logs {
let sol_event = NewInteropRoot::decode_log(&log.inner)
.expect("failed to decode log")
.data;
indexes.insert(sol_event.blockNumber, log);
}

indexes.into_values().collect()
}

async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let event = NewInteropRoot::decode_log(&log.inner)?.data;

let event_log_index = InteropRootsLogIndex {
block_number: log.block_number.unwrap(),
index_in_block: log.log_index.unwrap(),
};

if current_log_index < self.starting_interop_event_index {
if event_log_index < self.starting_interop_event_index {
tracing::debug!(
current_log_index = ?current_log_index,
log_id = ?event.logId,
starting_interop_event_index = ?self.starting_interop_event_index,
"skipping interop root event before starting index",
);
return Ok(());
}

let interop_root = InteropRoot {
chainId: tx.chainId,
blockOrBatchNumber: tx.batchNumber,
sides: vec![tx.chainRoot],
chainId: event.chainId,
blockOrBatchNumber: event.blockNumber,
sides: event.sides.clone(),
};

self.interop_roots_subpool
.add_root(IndexedInteropRoot {
log_index: current_log_index,
root: interop_root,
})
.await;

self.interop_roots_subpool.add_root(IndexedInteropRoot {
log_index: event_log_index,
root: interop_root,
}).await;
Ok(())
}
}
4 changes: 4 additions & 0 deletions lib/l1_watcher/src/persist_batch_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ impl<BatchStorage: WriteBatch, Finality: WriteFinality> ProcessRawEvents
(*self.zk_chain.address()).into()
}

fn filter_events(&self, logs: Vec<Log>) -> Vec<Log> {
logs
}

async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let event_signature = log.topics()[0];
match event_signature {
Expand Down
6 changes: 6 additions & 0 deletions lib/l1_watcher/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub trait ProcessRawEvents: Send + Sync + 'static {
/// See [`alloy::rpc::types::Filter`] documentation for more details.
fn contract_addresses(&self) -> ValueOrArray<Address>;

fn filter_events(&self, logs: Vec<Log>) -> Vec<Log>;

/// Invoked each time a new log matching the filter is found.
async fn process_raw_event(&mut self, event: Log) -> Result<(), L1WatcherError>;
}
Expand All @@ -51,6 +53,10 @@ where
self.contract_address().into()
}

fn filter_events(&self, logs: Vec<Log>) -> Vec<Log> {
logs
}

async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let sol_event = T::SolEvent::decode_log(&log.inner)?.data;
let watched_event =
Expand Down
3 changes: 3 additions & 0 deletions lib/l1_watcher/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl L1Watcher {
let events = self
.extract_logs_from_l1_blocks(from_block, to_block)
.await?;

let events = self.processor.filter_events(events);

METRICS.events_loaded[&self.processor.name()].inc_by(events.len() as u64);
METRICS.most_recently_scanned_l1_block[&self.processor.name()].set(to_block);

Expand Down
Loading
Loading