Skip to content

Commit 876bef9

Browse files
authored
Jonas/service lifecycle (#293)
* introduce server that manages joint tasks * add comment * fix container name * update comment * comment * clippy * clippy * error handling * re-initialize on task failur * remove wrapper handle * reinitialize in 'normal' mode * join service tasks * Enhance: Finalize Hyperlane Snapshots (#292)
1 parent 27315b3 commit 876bef9

File tree

4 files changed

+148
-21
lines changed

4 files changed

+148
-21
lines changed

crates/ev-prover/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,19 @@ To lint the Protobuf definitions:
114114
cd crates/ev-prover/proto
115115
buf lint
116116
```
117+
118+
## Hyperlane Message Finality
119+
120+
```mermaid
121+
sequenceDiagram;
122+
Reth->>MessageProver: Transactions;
123+
SnapshotStore->>MessageProver: Trusted Snapshot finalized==false;
124+
MessageProver->>SnapshotStore: New Snapshot after Inserts finalized==false;
125+
MessageProver->>SnapshotStore: Trusted Snapshot finalized==true;
126+
```
127+
128+
Finality serves two distinct purposes. Firstly, it can be used to expose the status of message proofs to the user. However the primary purpose is such that errors can be detected when the finalization status is not updated for a snapshot. Currently this will not break the system, as new messages will be proven from the next snapshot onwards, but messages could get lost until a proper retry mechanism is implemented.
129+
130+
So long as the finality status of all snapshots flips to `true` there is nothing to worry about. All snapshots, except for the most recent one, should be finalized at all times. Gaps in finalization indicate that message proof submission was unsuccessful, or that the DB corrupted post submission.
131+
132+
Note that the message prover always takes the last known snapshot, which is expected to be unfinalized, but no such check is enforced and generates a proof using all messages that occurred from the height of the trusted snapshot + 1 all the way to the trusted EV height in ZKISM, aka `committed_height`.

crates/ev-prover/src/prover/programs/combined.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ impl EvCombinedProver {
289289
}
290290

291291
let namespace = self.app.namespace;
292-
for height in scan_start..latest_head {
292+
for height in scan_start..=latest_head {
293293
if !self.is_empty_block(height, namespace).await? {
294294
// Ensure batch size stays within allowed range
295295
let blocks_elapsed = height.saturating_sub(trusted_celestia_height);
@@ -323,10 +323,13 @@ impl EvCombinedProver {
323323
let msg = MsgUpdateZkExecutionIsm::new(id, proof.bytes(), public_values, signer);
324324

325325
info!("Updating ZKISM on Celestia...");
326-
let res = self.app.ism_client.send_tx(msg).await?;
327-
assert!(res.success);
326+
let response = self.app.ism_client.send_tx(msg).await?;
327+
if !response.success {
328+
error!("Failed to submit state transition proof to ZKISM: {:?}", response);
329+
return Err(anyhow::anyhow!("Failed to submit state transition proof to ZKISM"));
330+
}
328331

329-
info!("[Done] Proof tx submitted to ism with hash: {}", res.tx_hash);
332+
info!("[Done] Proof tx submitted to ism with hash: {}", response.tx_hash);
330333

331334
Ok(())
332335
}

crates/ev-prover/src/prover/programs/message.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,16 @@ impl HyperlaneMessageProver {
221221
&self,
222222
evm_provider: &DefaultProvider,
223223
indexer: &mut HyperlaneIndexer,
224-
height: u64,
224+
committed_height: u64,
225225
proof: EIP1186AccountProofResponse,
226226
state_root: FixedBytes<32>,
227227
ism_client: &CelestiaIsmClient,
228228
) -> Result<()> {
229229
// generate a new proof for all messages that occurred since the last trusted height, inserting into the last snapshot
230230
// then save new snapshot
231-
let mut snapshot = self.snapshot_store.get_snapshot(self.snapshot_store.current_index()?)?;
232-
if snapshot.height == height {
231+
let trusted_snapshot_index = self.snapshot_store.current_index()?;
232+
let mut snapshot = self.snapshot_store.get_snapshot(trusted_snapshot_index)?;
233+
if snapshot.height == committed_height {
233234
debug!("No new ev blocks so no new messages to prove");
234235
return Ok(());
235236
}
@@ -239,15 +240,15 @@ impl HyperlaneMessageProver {
239240
.address(indexer.contract_address)
240241
.event(&Dispatch::id())
241242
.from_block(start_height)
242-
.to_block(height);
243+
.to_block(committed_height);
243244

244245
// run the indexer to get all messages that occurred since the last trusted height
245246
indexer
246247
.index(self.message_store.clone(), Arc::new(evm_provider.clone()))
247248
.await?;
248249

249250
let mut messages: Vec<StoredHyperlaneMessage> = Vec::new();
250-
for block in start_height..=height {
251+
for block in start_height..=committed_height {
251252
messages.extend(self.message_store.get_by_block(block)?);
252253
}
253254

@@ -281,17 +282,29 @@ impl HyperlaneMessageProver {
281282

282283
let message_proof_msg = MsgSubmitMessages::new(
283284
self.ctx.ism_id.clone(),
284-
height,
285+
committed_height,
285286
message_proof.0.bytes(),
286287
message_proof.0.public_values.as_slice().to_vec(),
287288
ism_client.signer_address().to_string(),
288289
);
289290

291+
// store the unfinalized snapshot
292+
snapshot.height = committed_height;
293+
let snapshot_index = self.snapshot_store.current_index()? + 1;
294+
self.snapshot_store.insert_snapshot(snapshot_index, snapshot)?;
290295
info!("Submitting Hyperlane tree proof to ZKISM...");
291296
let response = ism_client.send_tx(message_proof_msg).await?;
292297

293-
assert!(response.success);
298+
if !response.success {
299+
error!("Failed to submit Hyperlane tree proof to ZKISM: {:?}", response);
300+
return Err(anyhow::anyhow!("Failed to submit Hyperlane tree proof to ZKISM"));
301+
}
294302
info!("[Done] ZKISM was updated successfully");
303+
self.proof_store
304+
.store_membership_proof(committed_height, &message_proof.0, &message_proof.1)
305+
.await?;
306+
307+
self.snapshot_store.finalize_snapshot(trusted_snapshot_index)?;
295308

296309
info!("Relaying verified Hyperlane messages to Celestia...");
297310
// submit all now verified messages to hyperlane
@@ -305,18 +318,13 @@ impl HyperlaneMessageProver {
305318
message_hex,
306319
);
307320
let response = ism_client.send_tx(msg).await?;
308-
assert!(response.success);
321+
if !response.success {
322+
error!("Failed to relay Hyperlane message to Celestia: {:?}", response);
323+
return Err(anyhow::anyhow!("Failed to relay Hyperlane message to Celestia"));
324+
}
309325
}
310326
info!("[Done] Tia was bridged back to Celestia");
311327

312-
self.proof_store
313-
.store_membership_proof(height, &message_proof.0, &message_proof.1)
314-
.await?;
315-
316-
snapshot.height = height;
317-
self.snapshot_store
318-
.insert_snapshot(self.snapshot_store.current_index()? + 1, snapshot)?;
319-
320328
Ok(())
321329
}
322330
}

crates/storage/src/hyperlane/snapshot.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,23 @@ use std::sync::{Arc, RwLock};
1111

1212
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1313
pub struct HyperlaneSnapshot {
14+
// the trusted EV height in the ZKISM
1415
pub height: u64,
16+
// the Hyperlane Message Tree e.g. Snapshot
1517
pub tree: MerkleTree,
18+
// whether this Snapshot has been finalized
19+
pub finalized: bool,
1620
}
1721
impl HyperlaneSnapshot {
1822
pub fn new(height: u64, tree: MerkleTree) -> HyperlaneSnapshot {
19-
HyperlaneSnapshot { height, tree }
23+
HyperlaneSnapshot {
24+
height,
25+
tree,
26+
finalized: false,
27+
}
28+
}
29+
pub fn finalize(&mut self) {
30+
self.finalized = true;
2031
}
2132
}
2233

@@ -53,6 +64,7 @@ impl HyperlaneSnapshotStore {
5364
Ok(vec![ColumnFamilyDescriptor::new("snapshots", Options::default())])
5465
}
5566

67+
/// Insert a Hyperlane Snapshot into the database
5668
pub fn insert_snapshot(&self, index: u64, snapshot: HyperlaneSnapshot) -> Result<()> {
5769
// Serialize outside the lock to minimize lock duration
5870
let serialized = bincode::serialize(&snapshot).context("Failed to serialize snapshot")?;
@@ -70,6 +82,7 @@ impl HyperlaneSnapshotStore {
7082
Ok(())
7183
}
7284

85+
/// Get a Hyperlane Snapshot by index
7386
pub fn get_snapshot(&self, index: u64) -> Result<HyperlaneSnapshot> {
7487
let read_lock = self.db.read().map_err(|e| anyhow::anyhow!("lock error: {e}"))?;
7588
let cf = read_lock.cf_handle("snapshots").context("Missing CF")?;
@@ -88,6 +101,49 @@ impl HyperlaneSnapshotStore {
88101
Ok(snapshot)
89102
}
90103

104+
/// Get the latest pending snapshot, we expect only the most recent snapshot to be unfinalized
105+
pub fn get_pending_snapshot(&self) -> Result<Option<(u64, HyperlaneSnapshot)>> {
106+
let read_lock = self
107+
.db
108+
.read()
109+
.map_err(|e| anyhow::anyhow!("Failed to acquire read lock: {e}"))?;
110+
let cf = read_lock.cf_handle("snapshots").context("Missing CF")?;
111+
let mut iter = read_lock.iterator_cf(cf, IteratorMode::End);
112+
while let Some(Ok((k, v))) = iter.next() {
113+
if k.len() != 8 {
114+
continue;
115+
}
116+
let mut buf = [0u8; 8];
117+
buf.copy_from_slice(&k);
118+
let index = u64::from_be_bytes(buf);
119+
let mut snapshot: HyperlaneSnapshot = bincode::deserialize(&v).context("Failed to deserialize snapshot")?;
120+
for h in snapshot.tree.branch.iter_mut() {
121+
if h.is_empty() {
122+
*h = ZERO_BYTES.to_string();
123+
}
124+
}
125+
if !snapshot.finalized {
126+
return Ok(Some((index, snapshot)));
127+
}
128+
}
129+
Ok(None)
130+
}
131+
132+
/// Finalize a Hyperlane Snapshot after successful proof submission
133+
pub fn finalize_snapshot(&self, index: u64) -> Result<()> {
134+
let mut snapshot = self
135+
.get_snapshot(index)
136+
.with_context(|| format!("Snapshot at index {index} not found"))?;
137+
if snapshot.finalized {
138+
return Err(anyhow::anyhow!(
139+
"Tried to finalize a finalized snapshot at index {index}"
140+
));
141+
}
142+
snapshot.finalized = true;
143+
self.insert_snapshot(index, snapshot)
144+
}
145+
146+
/// Get the next insert index for the Hyperlane Snapshot store
91147
pub fn current_index(&self) -> Result<u64> {
92148
let read_lock = self
93149
.db
@@ -104,6 +160,7 @@ impl HyperlaneSnapshotStore {
104160
}
105161
}
106162

163+
/// Reset the database by dropping the snapshots column family and creating a new one
107164
pub fn reset_db(&self) -> Result<()> {
108165
let mut write_lock = self
109166
.db
@@ -115,3 +172,46 @@ impl HyperlaneSnapshotStore {
115172
Ok(())
116173
}
117174
}
175+
176+
#[cfg(test)]
177+
mod tests {
178+
use super::*;
179+
180+
#[test]
181+
fn test_insert_snapshot() {
182+
let store = HyperlaneSnapshotStore::new(tempfile::tempdir().unwrap(), None).unwrap();
183+
let snapshot = HyperlaneSnapshot::new(0, MerkleTree::default());
184+
store.insert_snapshot(0, snapshot).unwrap();
185+
}
186+
#[test]
187+
fn test_get_snapshot() {
188+
let store = HyperlaneSnapshotStore::new(tempfile::tempdir().unwrap(), None).unwrap();
189+
let snapshot = HyperlaneSnapshot::new(0, MerkleTree::default());
190+
store.insert_snapshot(0, snapshot.clone()).unwrap();
191+
let retrieved_snapshot = store.get_snapshot(0).unwrap();
192+
assert_eq!(retrieved_snapshot, snapshot);
193+
}
194+
#[test]
195+
fn test_get_pending_snapshot() {
196+
let store = HyperlaneSnapshotStore::new(tempfile::tempdir().unwrap(), None).unwrap();
197+
let first_snapshot = HyperlaneSnapshot::new(0, MerkleTree::default());
198+
let second_snapshot = HyperlaneSnapshot::new(1, MerkleTree::default());
199+
let third_snapshot = HyperlaneSnapshot::new(2, MerkleTree::default());
200+
store.insert_snapshot(0, first_snapshot.clone()).unwrap();
201+
store.insert_snapshot(1, second_snapshot.clone()).unwrap();
202+
store.insert_snapshot(2, third_snapshot.clone()).unwrap();
203+
store.finalize_snapshot(0).unwrap();
204+
store.finalize_snapshot(1).unwrap();
205+
let retrieved_snapshot = store.get_pending_snapshot().unwrap();
206+
assert_eq!(retrieved_snapshot, Some((2, third_snapshot)));
207+
}
208+
#[test]
209+
fn test_finalize_snapshot() {
210+
let store = HyperlaneSnapshotStore::new(tempfile::tempdir().unwrap(), None).unwrap();
211+
let snapshot = HyperlaneSnapshot::new(0, MerkleTree::default());
212+
store.insert_snapshot(0, snapshot.clone()).unwrap();
213+
store.finalize_snapshot(0).unwrap();
214+
let retrieved_snapshot = store.get_snapshot(0).unwrap();
215+
assert!(retrieved_snapshot.finalized);
216+
}
217+
}

0 commit comments

Comments
 (0)