Skip to content

Commit 1f668e5

Browse files
authored
fix(eth-watch): return internal errors from BatchRootProcessor (#4285)
## What ❔ Return internal errors from BatchRootProcessor instead of client errors ## Why ❔ BatchRootProcessor has its state -- merkle tree. It must be consistent with postgres: after each `process_events` call, set of batches that have `batch_chain_merkle_path` in postgres and set of batches present in the merkle tree should be the same. Otherwise on restart eth watch will be in crashloop. Eth watch treats client error as retryable so if merkle tree was updated and then client returns error -> process_events returns before batch_chain_merkle_path is stored. Returning internal error fixes the issue because eth watch exits and restarts on them ## Is this a breaking change? - [ ] Yes - [x] No ## Operational changes <!-- Any config changes? Any new flags? Any changes to any scripts? --> <!-- Please add anything that non-Matter Labs entities running their own ZK Chain may need to know --> ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`.
1 parent 77d043f commit 1f668e5

File tree

6 files changed

+127
-44
lines changed

6 files changed

+127
-44
lines changed

core/node/eth_watch/src/event_processors/appended_chain_batch_root.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ impl EventProcessor for BatchRootProcessor {
6565
let mut transaction = storage
6666
.start_transaction()
6767
.await
68-
.map_err(DalError::generalize)?;
68+
.map_err(DalError::generalize)
69+
.map_err(EventProcessorError::internal)?;
6970

7071
let grouped_events: Vec<_> = events
7172
.into_iter()
@@ -113,13 +114,21 @@ impl EventProcessor for BatchRootProcessor {
113114
}
114115
});
115116

116-
let sl_chain_id = self.sl_l2_client.chain_id().await?;
117+
let sl_chain_id = self
118+
.sl_l2_client
119+
.chain_id()
120+
.await
121+
.context("sl_l2_client.chain_id()")
122+
.map_err(EventProcessorError::internal)?;
117123
for (sl_l1_batch_number, chain_batches) in new_events {
118124
let chain_agg_proof = self
119125
.sl_l2_client
120126
.get_chain_log_proof(sl_l1_batch_number, self.l2_chain_id)
121-
.await?
122-
.context("Missing chain log proof for finalized batch")?;
127+
.await
128+
.context("sl_l2_client.get_chain_log_proof()")
129+
.map_err(EventProcessorError::internal)?
130+
.context("Missing chain log proof for finalized batch")
131+
.map_err(EventProcessorError::internal)?;
123132
let chain_proof_vector =
124133
Self::chain_proof_vector(sl_l1_batch_number, chain_agg_proof, sl_chain_id);
125134

@@ -128,8 +137,10 @@ impl EventProcessor for BatchRootProcessor {
128137
.blocks_dal()
129138
.get_l1_batch_l2_l1_merkle_root(*batch_number)
130139
.await
131-
.map_err(DalError::generalize)?
132-
.context("Missing l2_l1_merkle_root for finalized batch")?;
140+
.map_err(DalError::generalize)
141+
.map_err(EventProcessorError::internal)?
142+
.context("Missing l2_l1_merkle_root for finalized batch")
143+
.map_err(EventProcessorError::internal)?;
133144
assert_eq!(root_from_db, *batch_root);
134145

135146
self.merkle_tree
@@ -141,7 +152,9 @@ impl EventProcessor for BatchRootProcessor {
141152
let chain_root_remote = self
142153
.sl_l2_client
143154
.get_chain_root_l2(sl_l1_batch_number, self.l2_chain_id)
144-
.await?;
155+
.await
156+
.context("sl_l2_client.get_chain_root_l2()")
157+
.map_err(EventProcessorError::internal)?;
145158
assert_eq!(
146159
chain_root_local,
147160
chain_root_remote.unwrap(),
@@ -172,11 +185,16 @@ impl EventProcessor for BatchRootProcessor {
172185
.blocks_dal()
173186
.set_batch_chain_merkle_path(*batch_number, proof)
174187
.await
175-
.map_err(DalError::generalize)?;
188+
.map_err(DalError::generalize)
189+
.map_err(EventProcessorError::internal)?;
176190
}
177191
}
178192

179-
transaction.commit().await.map_err(DalError::generalize)?;
193+
transaction
194+
.commit()
195+
.await
196+
.map_err(DalError::generalize)
197+
.map_err(EventProcessorError::internal)?;
180198

181199
Ok(events_count)
182200
}

core/node/eth_watch/src/event_processors/decentralized_upgrades.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,38 +75,58 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor {
7575
) -> Result<usize, EventProcessorError> {
7676
let mut upgrades = Vec::new();
7777
for event in &events {
78-
let version = event.topics.get(1).copied().context("missing topic 1")?;
78+
let version = event
79+
.topics
80+
.get(1)
81+
.copied()
82+
.context("missing topic 1")
83+
.map_err(EventProcessorError::internal)?;
7984
let timestamp: u64 = U256::from_big_endian(&event.data.0)
8085
.try_into()
8186
.ok()
82-
.context("upgrade timestamp is too big")?;
87+
.context("upgrade timestamp is too big")
88+
.map_err(EventProcessorError::internal)?;
8389

8490
let diamond_cut = self
8591
.sl_client
8692
.diamond_cut_by_version(version)
87-
.await?
88-
.context("missing upgrade data on STM")?;
93+
.await
94+
.map_err(EventProcessorError::client)?
95+
.context("missing upgrade data on STM")
96+
.map_err(EventProcessorError::internal)?;
8997

9098
let upgrade = ProtocolUpgrade {
9199
timestamp,
92100
..ProtocolUpgrade::try_from_diamond_cut(
93101
&diamond_cut,
94102
self.l1_client.as_ref(),
95-
self.l1_client.get_chain_gateway_upgrade_info().await?,
103+
self.l1_client
104+
.get_chain_gateway_upgrade_info()
105+
.await
106+
.map_err(EventProcessorError::contract_call)?,
96107
)
97-
.await?
108+
.await
109+
.map_err(EventProcessorError::internal)?
98110
};
99111

100112
// Scheduler VK is not present in proposal event. It is hard coded in verifier contract.
101113
let scheduler_vk_hash = if let Some(address) = upgrade.verifier_address {
102-
Some(self.sl_client.scheduler_vk_hash(address).await?)
114+
Some(
115+
self.sl_client
116+
.scheduler_vk_hash(address)
117+
.await
118+
.map_err(EventProcessorError::contract_call)?,
119+
)
103120
} else {
104121
None
105122
};
106123

107124
// Scheduler VK is not present in proposal event. It is hard coded in verifier contract.
108125
let fflonk_scheduler_vk_hash = if let Some(address) = upgrade.verifier_address {
109-
self.sl_client.fflonk_scheduler_vk_hash(address).await?
126+
self.sl_client
127+
.fflonk_scheduler_vk_hash(address)
128+
.await
129+
.map_err(EventProcessorError::contract_call)?
110130
} else {
111131
None
112132
};
@@ -135,21 +155,25 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor {
135155
.protocol_versions_dal()
136156
.latest_semantic_version()
137157
.await
138-
.map_err(DalError::generalize)?
139-
.context("expected some version to be present in DB")?;
158+
.map_err(DalError::generalize)
159+
.map_err(EventProcessorError::internal)?
160+
.context("expected some version to be present in DB")
161+
.map_err(EventProcessorError::internal)?;
140162

141163
if upgrade.version > latest_semantic_version {
142164
let latest_version = storage
143165
.protocol_versions_dal()
144166
.get_protocol_version_with_latest_patch(latest_semantic_version.minor)
145167
.await
146-
.map_err(DalError::generalize)?
168+
.map_err(DalError::generalize)
169+
.map_err(EventProcessorError::internal)?
147170
.with_context(|| {
148171
format!(
149172
"expected minor version {} to be present in DB",
150173
latest_semantic_version.minor as u16
151174
)
152-
})?;
175+
})
176+
.map_err(EventProcessorError::internal)?;
153177

154178
let new_version = latest_version.apply_upgrade(
155179
upgrade,
@@ -168,7 +192,8 @@ impl EventProcessor for DecentralizedUpgradesEventProcessor {
168192
.protocol_versions_dal()
169193
.save_protocol_version_with_tx(&new_version)
170194
.await
171-
.map_err(DalError::generalize)?;
195+
.map_err(DalError::generalize)
196+
.map_err(EventProcessorError::internal)?;
172197
}
173198
}
174199
stage_latency.observe();

core/node/eth_watch/src/event_processors/gateway_migration.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ impl EventProcessor for GatewayMigrationProcessor {
3434
events: Vec<Log>,
3535
) -> Result<usize, EventProcessorError> {
3636
for event in &events {
37-
let main_topic = event.topics.first().copied().context("missing topic 0")?;
37+
let main_topic = event
38+
.topics
39+
.first()
40+
.copied()
41+
.context("missing topic 0")
42+
.map_err(EventProcessorError::internal)?;
3843
if !self.possible_main_topics.contains(&main_topic) {
3944
continue;
4045
}
@@ -44,7 +49,8 @@ impl EventProcessor for GatewayMigrationProcessor {
4449
.topics
4550
.get(1)
4651
.copied()
47-
.context("missing topic 1")?
52+
.context("missing topic 1")
53+
.map_err(EventProcessorError::internal)?
4854
.as_bytes(),
4955
);
5056

core/node/eth_watch/src/event_processors/mod.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,31 @@ mod priority_ops;
1818
/// Errors issued by an [`EventProcessor`].
1919
#[derive(Debug, thiserror::Error)]
2020
pub(super) enum EventProcessorError {
21+
#[error("Fatal: {0:?}")]
22+
Fatal(#[from] FatalError),
23+
#[error("Transient: {0:?}")]
24+
Transient(#[from] TransientError),
25+
}
26+
27+
#[derive(Debug, thiserror::Error)]
28+
pub(super) enum FatalError {
2129
#[error("failed parsing a log into {log_kind}: {source:?}")]
2230
LogParse {
2331
log_kind: &'static str,
2432
#[source]
2533
source: anyhow::Error,
2634
},
35+
/// Internal errors are considered fatal (i.e., they bubble up and lead to the watcher termination).
36+
#[error("internal processing error: {0:?}")]
37+
Internal(#[from] anyhow::Error),
38+
}
39+
40+
#[derive(Debug, thiserror::Error)]
41+
pub(super) enum TransientError {
2742
#[error("Eth client error: {0}")]
2843
Client(#[from] EnrichedClientError),
2944
#[error("Contract call error: {0}")]
3045
ContractCall(#[from] ContractCallError),
31-
/// Internal errors are considered fatal (i.e., they bubble up and lead to the watcher termination).
32-
#[error("internal processing error: {0:?}")]
33-
Internal(#[from] anyhow::Error),
3446
}
3547

3648
#[derive(Debug)]
@@ -41,10 +53,22 @@ pub(super) enum EventsSource {
4153

4254
impl EventProcessorError {
4355
pub fn log_parse(source: impl Into<anyhow::Error>, log_kind: &'static str) -> Self {
44-
Self::LogParse {
56+
Self::Fatal(FatalError::LogParse {
4557
log_kind,
4658
source: source.into(),
47-
}
59+
})
60+
}
61+
62+
pub fn internal(source: impl Into<anyhow::Error>) -> Self {
63+
Self::Fatal(FatalError::Internal(source.into()))
64+
}
65+
66+
pub fn client(source: impl Into<EnrichedClientError>) -> Self {
67+
Self::Transient(TransientError::Client(source.into()))
68+
}
69+
70+
pub fn contract_call(source: impl Into<ContractCallError>) -> Self {
71+
Self::Transient(TransientError::ContractCall(source.into()))
4872
}
4973
}
5074

core/node/eth_watch/src/event_processors/priority_ops.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ impl EventProcessor for PriorityOpsEventProcessor {
8888
let stage_latency = METRICS.poll_eth_node[&PollStage::PersistL1Txs].start();
8989
APP_METRICS.processed_txs[&TxStage::added_to_mempool()].inc();
9090
APP_METRICS.processed_l1_txs[&TxStage::added_to_mempool()].inc();
91-
let processed_priority_transactions = self.sl_client.get_total_priority_txs().await?;
91+
let processed_priority_transactions = self
92+
.sl_client
93+
.get_total_priority_txs()
94+
.await
95+
.map_err(EventProcessorError::contract_call)?;
9296
let ops_to_insert: Vec<&L1Tx> = new_ops
9397
.iter()
9498
.take_while(|op| processed_priority_transactions > op.serial_id().0)
@@ -99,7 +103,8 @@ impl EventProcessor for PriorityOpsEventProcessor {
99103
.transactions_dal()
100104
.insert_transaction_l1(new_op, new_op.eth_block())
101105
.await
102-
.map_err(DalError::generalize)?;
106+
.map_err(DalError::generalize)
107+
.map_err(EventProcessorError::internal)?;
103108
}
104109
stage_latency.observe();
105110
if let Some(last_op) = ops_to_insert.last() {

core/node/eth_watch/src/lib.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,11 @@ impl EthWatch {
162162
/* everything went fine */
163163
METRICS.eth_poll.inc();
164164
}
165-
Err(EventProcessorError::Internal(err)) => {
166-
tracing::error!("Internal error processing new blocks: {err:?}");
167-
return Err(err);
165+
Err(EventProcessorError::Fatal(err)) => {
166+
tracing::error!("Fatal error processing new blocks: {err:?}");
167+
return Err(err.into());
168168
}
169-
Err(err) => {
170-
// This is an error because otherwise we could potentially miss a priority operation
171-
// thus entering priority mode, which is not desired.
169+
Err(EventProcessorError::Transient(err)) => {
172170
tracing::error!("Failed to process new blocks: {err}");
173171
}
174172
}
@@ -188,13 +186,17 @@ impl EthWatch {
188186
EventsSource::L1 => self.l1_client.as_ref(),
189187
EventsSource::SL => self.sl_client.as_ref(),
190188
};
191-
let chain_id = client.chain_id().await?;
189+
let chain_id = client
190+
.chain_id()
191+
.await
192+
.map_err(EventProcessorError::client)?;
192193

193194
let to_block = if processor.only_finalized_block() {
194-
client.finalized_block_number().await?
195+
client.finalized_block_number().await
195196
} else {
196-
client.confirmed_block_number().await?
197-
};
197+
client.confirmed_block_number().await
198+
}
199+
.map_err(EventProcessorError::client)?;
198200

199201
let from_block = storage
200202
.eth_watcher_dal()
@@ -204,7 +206,8 @@ impl EthWatch {
204206
to_block.saturating_sub(self.event_expiration_blocks),
205207
)
206208
.await
207-
.map_err(DalError::generalize)?;
209+
.map_err(DalError::generalize)
210+
.map_err(EventProcessorError::internal)?;
208211

209212
// There are no new blocks so there is nothing to be done
210213
if from_block > to_block {
@@ -219,7 +222,8 @@ impl EthWatch {
219222
processor.topic2(),
220223
RETRY_LIMIT,
221224
)
222-
.await?;
225+
.await
226+
.map_err(EventProcessorError::client)?;
223227
let processed_events_count = processor
224228
.process_events(storage, processor_events.clone())
225229
.await?;
@@ -245,7 +249,8 @@ impl EthWatch {
245249
next_block_to_process,
246250
)
247251
.await
248-
.map_err(DalError::generalize)?;
252+
.map_err(DalError::generalize)
253+
.map_err(EventProcessorError::internal)?;
249254
}
250255

251256
Ok(())

0 commit comments

Comments
 (0)