Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/clickhouse/migrations/001_create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,3 @@ CREATE TABLE IF NOT EXISTS ${DB}.l1_data_costs (
inserted_at DateTime64(3) DEFAULT now64()
) ENGINE = MergeTree()
ORDER BY (l1_block_number);

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ ALTER TABLE ${DB}.orphaned_l2_hashes
ADD INDEX IF NOT EXISTS idx_orphaned_l2_block_hash_bf block_hash TYPE bloom_filter(0.01) GRANULARITY 1;
ALTER TABLE ${DB}.orphaned_l2_hashes MATERIALIZE INDEX idx_orphaned_l2_block_hash_bf;


-- preconf_data: filters by operator fields
ALTER TABLE ${DB}.preconf_data
ADD INDEX IF NOT EXISTS idx_preconf_current_op_bf current_operator TYPE bloom_filter(0.01) GRANULARITY 1,
Expand Down
13 changes: 13 additions & 0 deletions crates/clickhouse/migrations/019_create_orphaned_l1_hashes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Migration 019: Create orphaned_l1_hashes table and index

CREATE TABLE IF NOT EXISTS ${DB}.orphaned_l1_hashes (
block_hash FixedString(32),
l1_block_number UInt64,
inserted_at DateTime64(3) DEFAULT now64()
) ENGINE = MergeTree()
ORDER BY (l1_block_number, block_hash);

-- orphaned_l1_hashes: lookups by block_hash
ALTER TABLE ${DB}.orphaned_l1_hashes
ADD INDEX IF NOT EXISTS idx_orphaned_l1_block_hash_bf block_hash TYPE bloom_filter(0.01) GRANULARITY 1;
ALTER TABLE ${DB}.orphaned_l1_hashes MATERIALIZE INDEX idx_orphaned_l1_block_hash_bf;
9 changes: 9 additions & 0 deletions crates/clickhouse/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ pub struct OrphanedL2HashRow {
pub l2_block_number: u64,
}

/// Orphaned L1 block hash row
#[derive(Debug, Row, Serialize, Deserialize, PartialEq, Eq)]
pub struct OrphanedL1HashRow {
/// Block hash of orphaned block
pub block_hash: HashBytes,
/// L1 block number of orphaned block
pub l1_block_number: u64,
}

/// Verified batch row
#[derive(Debug, Row, Serialize, Deserialize, PartialEq, Eq)]
pub struct VerifiedBatchRow {
Expand Down
53 changes: 52 additions & 1 deletion crates/clickhouse/src/reader/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ impl ClickhouseReader {
)
}

/// Anti-subquery for L1: hides L1 blocks later rolled back by a reorg.
/// Use with `NOT IN (SELECT block_hash FROM ...)` against `l1_head_events` aliases.
fn reorg_filter_l1(&self, table_alias: &str) -> String {
format!(
"{table_alias}.block_hash NOT IN ( \
SELECT block_hash \
FROM {db}.orphaned_l1_hashes\
)",
db = self.db_name,
)
}

/// Get last L2 head time
pub async fn get_last_l2_head_time(&self) -> Result<Option<DateTime<Utc>>> {
let client = self.base.clone();
Expand Down Expand Up @@ -220,7 +232,8 @@ impl ClickhouseReader {
let sql = "SELECT max(l1_events.block_ts) AS block_ts \
FROM ?.batches b \
INNER JOIN ?.l1_head_events l1_events \
ON b.l1_block_number = l1_events.l1_block_number";
ON b.l1_block_number = l1_events.l1_block_number \
WHERE l1_events.block_hash NOT IN (SELECT block_hash FROM ?.orphaned_l1_hashes)";

let start = Instant::now();
let result = client
Expand Down Expand Up @@ -2961,9 +2974,11 @@ impl ClickhouseReader {
INNER JOIN {db}.l1_head_events l1_events \
ON b.l1_block_number = l1_events.l1_block_number \
WHERE l1_events.block_ts >= toUnixTimestamp(now64() - INTERVAL {interval}) \
AND {l1_filter} \
ORDER BY b.l1_block_number ASC",
interval = range.interval(),
db = self.db_name,
l1_filter = self.reorg_filter_l1("l1_events"),
);

let rows = self.execute::<BatchBlobCountRow>(&query).await?;
Expand Down Expand Up @@ -2994,6 +3009,8 @@ impl ClickhouseReader {
if let Some(end) = ending_before {
query.push_str(&format!(" AND b.batch_id > {}", end));
}
// Exclude L1 orphaned hashes
query.push_str(&format!(" AND {}", self.reorg_filter_l1("l1_events")));
query.push_str(" ORDER BY b.batch_id DESC");
query.push_str(&format!(" LIMIT {}", limit));

Expand Down Expand Up @@ -3163,6 +3180,40 @@ impl ClickhouseReader {
Ok(rows.into_iter().map(|r| (r.block_hash, r.l2_block_number)).collect())
}

/// Get the most recent L1 block hashes for the specified L1 block numbers
/// This is used to identify orphaned L1 blocks during reorgs
pub async fn get_latest_l1_hashes_for_blocks(
&self,
block_numbers: &[u64],
) -> Result<Vec<(HashBytes, u64)>> {
if block_numbers.is_empty() {
return Ok(vec![]);
}

#[derive(Row, Deserialize)]
struct HashRow {
block_hash: HashBytes,
l1_block_number: u64,
}

let block_list = block_numbers.iter().map(|n| n.to_string()).collect::<Vec<_>>().join(",");
let query = format!(
"SELECT block_hash, l1_block_number \
FROM (\
SELECT block_hash, l1_block_number, \
ROW_NUMBER() OVER (PARTITION BY l1_block_number ORDER BY inserted_at DESC) as rn \
FROM {db}.l1_head_events \
WHERE l1_block_number IN ({block_list})\
) ranked \
WHERE rn = 1 \
ORDER BY l1_block_number",
db = self.db_name,
);

let rows = self.execute::<HashRow>(&query).await?;
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct string interpolation of user-provided block numbers into SQL query creates potential SQL injection vulnerability. Consider using parameterized queries or proper escaping.

Suggested change
let rows = self.execute::<HashRow>(&query).await?;
let query = format!(
"SELECT block_hash, l1_block_number \
FROM (\
SELECT block_hash, l1_block_number, \
ROW_NUMBER() OVER (PARTITION BY l1_block_number ORDER BY inserted_at DESC) as rn \
FROM {db}.l1_head_events \
WHERE l1_block_number IN {{block_numbers:Array(UInt64)}}\
) ranked \
WHERE rn = 1 \
ORDER BY l1_block_number",
db = self.db_name,
);
let mut query_builder = self.client.query(&query);
query_builder.bind("block_numbers", block_numbers);
let rows = query_builder
.fetch_all::<HashRow>()
.await
.wrap_err("Failed to fetch latest l1 hashes for blocks")?;

Copilot uses AI. Check for mistakes.
Ok(rows.into_iter().map(|r| (r.block_hash, r.l1_block_number)).collect())
}

/// Get combined L2 fees and batch components data for a unified endpoint
pub async fn get_l2_fees_and_components(
&self,
Expand Down
8 changes: 8 additions & 0 deletions crates/clickhouse/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub const TABLES: &[&str] = &[
"prove_costs",
"verify_costs",
"orphaned_l2_hashes",
"orphaned_l1_hashes",
];

/// Names of all materialized views
Expand Down Expand Up @@ -201,4 +202,11 @@ pub const TABLE_SCHEMAS: &[TableSchema] = &[
inserted_at DateTime64(3) DEFAULT now64()",
order_by: "l2_block_number, block_hash",
},
TableSchema {
name: "orphaned_l1_hashes",
columns: "block_hash FixedString(32),
l1_block_number UInt64,
inserted_at DateTime64(3) DEFAULT now64()",
order_by: "l1_block_number, block_hash",
},
];
22 changes: 20 additions & 2 deletions crates/clickhouse/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ use crate::{
L1Header,
models::{
BatchBlockRow, BatchRow, ForcedInclusionProcessedRow, L1DataCostInsertRow, L1HeadEvent,
L2HeadEvent, L2ReorgInsertRow, OrphanedL2HashRow, PreconfData, ProveCostInsertRow,
ProvedBatchRow, VerifiedBatchRow, VerifyCostInsertRow,
L2HeadEvent, L2ReorgInsertRow, OrphanedL1HashRow, OrphanedL2HashRow, PreconfData,
ProveCostInsertRow, ProvedBatchRow, VerifiedBatchRow, VerifyCostInsertRow,
},
schema::{TABLE_SCHEMAS, TABLES, TableSchema, VIEWS},
types::{AddressBytes, HashBytes},
Expand Down Expand Up @@ -508,6 +508,24 @@ impl ClickhouseWriter {
insert.end().await?;
Ok(())
}

/// Insert orphaned L1 block hashes
pub async fn insert_orphaned_l1_hashes(&self, hashes: &[(HashBytes, u64)]) -> Result<()> {
if hashes.is_empty() {
return Ok(());
}

let client = self.base.clone();
let mut insert = client.insert(&format!("{}.orphaned_l1_hashes", self.db_name))?;

for (hash, block_number) in hashes {
let row = OrphanedL1HashRow { block_hash: *hash, l1_block_number: *block_number };
insert.write(&row).await?;
}

insert.end().await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
81 changes: 80 additions & 1 deletion crates/driver/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct ProcessorDriver {
clickhouse_reader: Option<ClickhouseReader>,
extractor: Extractor,
reorg_detector: ReorgDetector,
l1_reorg_detector: ReorgDetector,
last_l2_header: Option<(u64, Address)>,
enable_db_writes: bool,
incident_client: IncidentClient,
Expand All @@ -56,6 +57,7 @@ struct ProcessorComponents {
clickhouse_reader: Option<ClickhouseReader>,
extractor: Extractor,
reorg_detector: ReorgDetector,
l1_reorg_detector: ReorgDetector,
last_l2_header: Option<(u64, Address)>,
enable_db_writes: bool,
processed_l2_headers: VecDeque<BlockHash>,
Expand Down Expand Up @@ -133,6 +135,7 @@ impl ProcessorDriver {

// Initialize reorg detector
let reorg_detector = ReorgDetector::new();
let l1_reorg_detector = ReorgDetector::new();

// init incident client and component IDs if monitors are enabled
let (
Expand Down Expand Up @@ -168,6 +171,7 @@ impl ProcessorDriver {
clickhouse_reader,
extractor,
reorg_detector,
l1_reorg_detector,
last_l2_header: None,
enable_db_writes: opts.enable_db_writes,
incident_client,
Expand Down Expand Up @@ -246,6 +250,7 @@ impl ProcessorDriver {
clickhouse_reader,
extractor,
mut reorg_detector,
mut l1_reorg_detector,
mut last_l2_header,
enable_db_writes,
mut processed_l2_headers,
Expand All @@ -258,6 +263,7 @@ impl ProcessorDriver {
clickhouse_reader,
extractor,
&mut reorg_detector,
&mut l1_reorg_detector,
&mut last_l2_header,
enable_db_writes,
&mut processed_l2_headers,
Expand All @@ -274,6 +280,7 @@ impl ProcessorDriver {
clickhouse_reader: self.clickhouse_reader,
extractor: self.extractor,
reorg_detector: self.reorg_detector,
l1_reorg_detector: self.l1_reorg_detector,
last_l2_header: self.last_l2_header,
enable_db_writes: self.enable_db_writes,
processed_l2_headers: self.processed_l2_headers,
Expand Down Expand Up @@ -362,6 +369,7 @@ impl ProcessorDriver {
clickhouse_reader: Option<ClickhouseReader>,
extractor: Extractor,
reorg_detector: &mut ReorgDetector,
l1_reorg_detector: &mut ReorgDetector,
last_l2_header: &mut Option<(u64, Address)>,
enable_db_writes: bool,
processed_l2_headers: &mut VecDeque<BlockHash>,
Expand Down Expand Up @@ -403,6 +411,7 @@ impl ProcessorDriver {
&clickhouse_reader,
&extractor,
reorg_detector,
l1_reorg_detector,
last_l2_header,
enable_db_writes,
processed_l2_headers,
Expand Down Expand Up @@ -437,6 +446,7 @@ impl ProcessorDriver {
clickhouse_reader: &Option<ClickhouseReader>,
extractor: &Extractor,
reorg_detector: &mut ReorgDetector,
l1_reorg_detector: &mut ReorgDetector,
last_l2_header: &mut Option<(u64, Address)>,
enable_db_writes: bool,
processed_l2_headers: &mut VecDeque<BlockHash>,
Expand All @@ -452,6 +462,7 @@ impl ProcessorDriver {
clickhouse_reader,
extractor,
reorg_detector,
l1_reorg_detector,
last_l2_header,
enable_db_writes,
processed_l2_headers,
Expand Down Expand Up @@ -506,6 +517,7 @@ impl ProcessorDriver {
clickhouse_reader: &Option<ClickhouseReader>,
extractor: &Extractor,
reorg_detector: &mut ReorgDetector,
l1_reorg_detector: &mut ReorgDetector,
last_l2_header: &mut Option<(u64, Address)>,
enable_db_writes: bool,
processed_l2_headers: &mut VecDeque<BlockHash>,
Expand All @@ -521,6 +533,7 @@ impl ProcessorDriver {
clickhouse_reader,
extractor,
reorg_detector,
l1_reorg_detector,
last_l2_header,
processed_l2_headers,
kv_store,
Expand All @@ -542,6 +555,7 @@ impl ProcessorDriver {
clickhouse_reader: &Option<ClickhouseReader>,
extractor: &Extractor,
reorg_detector: &mut ReorgDetector,
_l1_reorg_detector: &mut ReorgDetector,
last_l2_header: &mut Option<(u64, Address)>,
processed_l2_headers: &mut VecDeque<BlockHash>,
kv_store: Option<&kv::Store>,
Expand Down Expand Up @@ -601,7 +615,14 @@ impl ProcessorDriver {
Self::handle_batches_verified_event(writer, extractor, wrapper.clone()).await
}
TaikoEvent::L1Header(header) => {
Self::handle_l1_header_event(writer, extractor, header.clone()).await
Self::handle_l1_header_event(
writer,
clickhouse_reader,
_l1_reorg_detector,
extractor,
header.clone(),
)
.await
}
TaikoEvent::L2Header(header) => {
Self::handle_l2_header(
Expand Down Expand Up @@ -849,9 +870,16 @@ impl ProcessorDriver {
/// Handle `L1Header` event with database insertion and preconf data processing
async fn handle_l1_header_event(
writer: &ClickhouseWriter,
clickhouse_reader: &Option<ClickhouseReader>,
l1_reorg_detector: &mut ReorgDetector,
extractor: &Extractor,
header: primitives::headers::L1Header,
) -> Result<()> {
// Detect L1 reorgs similar to L2 approach
let old_head = l1_reorg_detector.head_number();
let reorg_result =
l1_reorg_detector.on_new_block_with_hash(header.number, B256::from(*header.hash));

// Insert L1 header
Self::with_db_error_context(
writer.insert_l1_header(&header),
Expand All @@ -865,6 +893,57 @@ impl ProcessorDriver {
// Process preconfirmation data (same as original driver)
Self::process_preconf_data(writer, extractor, &header).await;

// If an L1 reorg is detected, record orphaned L1 block hashes for the reorged range
if let Some((depth, orphaned_hash)) = reorg_result {
// Handle one-block reorg orphaned hash
if let Some(hash) = orphaned_hash {
if let Err(e) = writer
.insert_orphaned_l1_hashes(&[(HashBytes::from(hash), header.number)])
.await
{
tracing::error!(
block_number = header.number,
orphaned_hash = ?hash,
err = %e,
"Failed to insert orphaned L1 hash"
);
} else {
info!(block_number = header.number, orphaned_hash = ?hash, "Inserted orphaned L1 hash");
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using info! for this logging is inconsistent with the error case above which uses tracing::error!. Consider using tracing::info! for consistency.

Suggested change
info!(block_number = header.number, orphaned_hash = ?hash, "Inserted orphaned L1 hash");
tracing::info!(block_number = header.number, orphaned_hash = ?hash, "Inserted orphaned L1 hash");

Copilot uses AI. Check for mistakes.
}
}

if depth > 0 {
if let Some(reader) = clickhouse_reader {
let new_head = header.number;
let orphaned_start = new_head.saturating_add(1);
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for calculating orphaned block range appears incorrect. For a reorg where new_head < old_head, the orphaned blocks should be from new_head + 1 to old_head, but new_head.saturating_add(1) will be greater than old_head, making the range invalid.

Suggested change
let orphaned_start = new_head.saturating_add(1);
let orphaned_start = new_head + 1;

Copilot uses AI. Check for mistakes.
let orphaned_end = old_head; // inclusive
if orphaned_end >= orphaned_start {
let orphaned_numbers: Vec<u64> = (orphaned_start..=orphaned_end).collect();
if let Ok(orphaned_hashes) =
reader.get_latest_l1_hashes_for_blocks(&orphaned_numbers).await
{
if !orphaned_hashes.is_empty() {
if let Err(e) =
writer.insert_orphaned_l1_hashes(&orphaned_hashes).await
{
tracing::error!(
count = orphaned_hashes.len(),
err = %e,
"Failed to insert orphaned L1 hashes"
);
} else {
info!(
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using info! for this logging is inconsistent with the error case above which uses tracing::error!. Consider using tracing::info! for consistency.

Suggested change
info!(
tracing::info!(

Copilot uses AI. Check for mistakes.
count = orphaned_hashes.len(),
"Inserted orphaned L1 hashes for reorg"
);
}
}
}
}
}
}
}

Ok(())
}

Expand Down