Skip to content

Commit c0034f9

Browse files
author
Nam's Office Computer
committed
fix(validator): quorum safety tree reads
1 parent c9d9b6d commit c0034f9

1 file changed

Lines changed: 218 additions & 20 deletions

File tree

rust/main/agents/validator/src/validator.rs

Lines changed: 218 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use axum::Router;
55
use derive_more::AsRef;
66
use ethers::utils::keccak256;
77
use eyre::{eyre, Result};
8-
use futures_util::future::try_join_all;
8+
use futures_util::future::{join_all, try_join_all};
99
use itertools::Itertools;
1010
use serde::Serialize;
1111
use tokio::{task::JoinHandle, time::sleep};
@@ -25,10 +25,10 @@ use hyperlane_base::{
2525
RuntimeMetrics, SequenceAwareLogStore, SequencedDataContractSync,
2626
};
2727
use hyperlane_core::{
28-
rpc_clients::RPC_RETRY_SLEEP_DURATION, Announcement, ChainResult, CheckpointAtBlock,
29-
HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSigner, HyperlaneSignerExt,
30-
IncrementalMerkleAtBlock, Mailbox, MerkleTreeHook, MerkleTreeInsertion, ReorgPeriod, TxOutcome,
31-
ValidatorAnnounce, H256, U256,
28+
rpc_clients::RPC_RETRY_SLEEP_DURATION, Announcement, ChainCommunicationError, ChainResult,
29+
CheckpointAtBlock, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSigner,
30+
HyperlaneSignerExt, IncrementalMerkleAtBlock, Mailbox, MerkleTreeHook, MerkleTreeInsertion,
31+
ReorgPeriod, TxOutcome, ValidatorAnnounce, H256, U256,
3232
};
3333
use hyperlane_ethereum::{
3434
self as h_eth, RpcConnectionConf, Signers, SingletonSigner, SingletonSignerHandle,
@@ -54,7 +54,7 @@ struct ValidatorSafetyMerkleTreeHook {
5454
#[async_trait]
5555
impl MerkleTreeHook for ValidatorSafetyMerkleTreeHook {
5656
async fn tree(&self, reorg_period: &ReorgPeriod) -> ChainResult<IncrementalMerkleAtBlock> {
57-
self.fallback.tree(reorg_period).await
57+
self.safety.tree(reorg_period).await
5858
}
5959

6060
async fn count(&self, reorg_period: &ReorgPeriod) -> ChainResult<u32> {
@@ -73,6 +73,119 @@ impl MerkleTreeHook for ValidatorSafetyMerkleTreeHook {
7373
}
7474
}
7575

76+
#[derive(Debug)]
77+
struct ValidatorTronSafetyMerkleTreeHook {
78+
fallback: Arc<dyn MerkleTreeHook>,
79+
safety_hooks: Vec<Arc<dyn MerkleTreeHook>>,
80+
}
81+
82+
impl ValidatorTronSafetyMerkleTreeHook {
83+
fn quorum_threshold(&self) -> usize {
84+
self.safety_hooks.len() / 2 + 1
85+
}
86+
87+
fn select_quorum_result<T: Clone>(
88+
&self,
89+
results: Vec<ChainResult<T>>,
90+
matches: impl Fn(&T, &T) -> bool,
91+
context: &str,
92+
) -> ChainResult<T> {
93+
let mut oks = Vec::new();
94+
let mut first_err = None;
95+
96+
for result in results {
97+
match result {
98+
Ok(value) => oks.push(value),
99+
Err(err) => {
100+
if first_err.is_none() {
101+
first_err = Some(err);
102+
}
103+
}
104+
}
105+
}
106+
107+
for candidate in &oks {
108+
if oks.iter().filter(|other| matches(candidate, other)).count()
109+
>= self.quorum_threshold()
110+
{
111+
return Ok(candidate.clone());
112+
}
113+
}
114+
115+
Err(first_err.unwrap_or_else(|| ChainCommunicationError::from_other_str(context)))
116+
}
117+
}
118+
119+
#[async_trait]
120+
impl MerkleTreeHook for ValidatorTronSafetyMerkleTreeHook {
121+
async fn tree(&self, reorg_period: &ReorgPeriod) -> ChainResult<IncrementalMerkleAtBlock> {
122+
let results = join_all(self.safety_hooks.iter().cloned().map(|hook| {
123+
let reorg_period = reorg_period.clone();
124+
async move { hook.tree(&reorg_period).await }
125+
}))
126+
.await;
127+
128+
self.select_quorum_result(
129+
results,
130+
|a, b| a.tree == b.tree && a.block_height == b.block_height,
131+
"Failed to reach quorum for tron merkle tree",
132+
)
133+
}
134+
135+
async fn count(&self, reorg_period: &ReorgPeriod) -> ChainResult<u32> {
136+
self.fallback.count(reorg_period).await
137+
}
138+
139+
async fn latest_checkpoint(
140+
&self,
141+
reorg_period: &ReorgPeriod,
142+
) -> ChainResult<CheckpointAtBlock> {
143+
let results = join_all(self.safety_hooks.iter().cloned().map(|hook| {
144+
let reorg_period = reorg_period.clone();
145+
async move { hook.latest_checkpoint(&reorg_period).await }
146+
}))
147+
.await;
148+
149+
self.select_quorum_result(
150+
results,
151+
|a, b| a.checkpoint == b.checkpoint && a.block_height == b.block_height,
152+
"Failed to reach quorum for tron latest_checkpoint",
153+
)
154+
}
155+
156+
async fn latest_checkpoint_at_block(&self, height: u64) -> ChainResult<CheckpointAtBlock> {
157+
let results = join_all(
158+
self.safety_hooks
159+
.iter()
160+
.cloned()
161+
.map(|hook| async move { hook.latest_checkpoint_at_block(height).await }),
162+
)
163+
.await;
164+
165+
self.select_quorum_result(
166+
results,
167+
|a, b| a.checkpoint == b.checkpoint && a.block_height == b.block_height,
168+
"Failed to reach quorum for tron latest_checkpoint_at_block",
169+
)
170+
}
171+
}
172+
173+
impl HyperlaneChain for ValidatorTronSafetyMerkleTreeHook {
174+
fn domain(&self) -> &HyperlaneDomain {
175+
self.fallback.domain()
176+
}
177+
178+
fn provider(&self) -> Box<dyn hyperlane_core::HyperlaneProvider> {
179+
self.fallback.provider()
180+
}
181+
}
182+
183+
impl HyperlaneContract for ValidatorTronSafetyMerkleTreeHook {
184+
fn address(&self) -> H256 {
185+
self.fallback.address()
186+
}
187+
}
188+
76189
impl HyperlaneChain for ValidatorSafetyMerkleTreeHook {
77190
fn domain(&self) -> &HyperlaneDomain {
78191
self.fallback.domain()
@@ -231,20 +344,14 @@ impl BaseAgent for Validator {
231344

232345
let mailbox = origin_chain_conf.build_mailbox(&metrics).await?;
233346

234-
let merkle_tree_hook = if matches!(
235-
origin_chain_conf.connection,
236-
ChainConnectionConf::Ethereum(_)
237-
) {
238-
let fallback_hook = fallback_origin_chain_conf
239-
.build_merkle_tree_hook(&metrics)
240-
.await?;
241-
let safety_hook = safety_origin_chain_conf
242-
.build_merkle_tree_hook(&metrics)
243-
.await?;
244-
Box::new(ValidatorSafetyMerkleTreeHook {
245-
fallback: fallback_hook.into(),
246-
safety: safety_hook.into(),
247-
}) as Box<dyn MerkleTreeHook>
347+
let merkle_tree_hook = if Self::validator_uses_split_safety_hook(&origin_chain_conf) {
348+
Self::build_validator_merkle_tree_hook(
349+
&origin_chain_conf,
350+
&fallback_origin_chain_conf,
351+
&safety_origin_chain_conf,
352+
&metrics,
353+
)
354+
.await?
248355
} else {
249356
settings
250357
.build_merkle_tree_hook(&settings.origin_chain, &metrics)
@@ -422,6 +529,73 @@ impl BaseAgent for Validator {
422529
}
423530

424531
impl Validator {
532+
fn validator_uses_split_safety_hook(origin_chain_conf: &ChainConf) -> bool {
533+
matches!(
534+
origin_chain_conf.connection,
535+
ChainConnectionConf::Ethereum(_) | ChainConnectionConf::Tron(_)
536+
)
537+
}
538+
539+
async fn build_validator_merkle_tree_hook(
540+
origin_chain_conf: &ChainConf,
541+
fallback_origin_chain_conf: &ChainConf,
542+
safety_origin_chain_conf: &ChainConf,
543+
metrics: &CoreMetrics,
544+
) -> ChainResult<Box<dyn MerkleTreeHook>> {
545+
match &origin_chain_conf.connection {
546+
ChainConnectionConf::Ethereum(_) => {
547+
let fallback_hook = fallback_origin_chain_conf
548+
.build_merkle_tree_hook(metrics)
549+
.await?;
550+
let safety_hook = safety_origin_chain_conf
551+
.build_merkle_tree_hook(metrics)
552+
.await?;
553+
Ok(Box::new(ValidatorSafetyMerkleTreeHook {
554+
fallback: fallback_hook.into(),
555+
safety: safety_hook.into(),
556+
}) as Box<dyn MerkleTreeHook>)
557+
}
558+
ChainConnectionConf::Tron(_) => {
559+
let fallback_hook = fallback_origin_chain_conf
560+
.build_merkle_tree_hook(metrics)
561+
.await?;
562+
let safety_hooks =
563+
Self::build_validator_tron_safety_hooks(origin_chain_conf, metrics).await?;
564+
Ok(Box::new(ValidatorTronSafetyMerkleTreeHook {
565+
fallback: fallback_hook.into(),
566+
safety_hooks,
567+
}) as Box<dyn MerkleTreeHook>)
568+
}
569+
_ => unreachable!("validator split safety hook only supports ethereum and tron"),
570+
}
571+
}
572+
573+
async fn build_validator_tron_safety_hooks(
574+
origin_chain_conf: &ChainConf,
575+
metrics: &CoreMetrics,
576+
) -> ChainResult<Vec<Arc<dyn MerkleTreeHook>>> {
577+
let ChainConnectionConf::Tron(conn) = &origin_chain_conf.connection else {
578+
unreachable!("tron safety hooks only supported for tron chains");
579+
};
580+
581+
let mut safety_hooks = Vec::new();
582+
let wallet_solidity_urls = if conn.wallet_solidity_urls.is_empty() {
583+
conn.wallet_urls.clone()
584+
} else {
585+
conn.wallet_solidity_urls.clone()
586+
};
587+
588+
for wallet_solidity_url in wallet_solidity_urls {
589+
let mut chain_conf = origin_chain_conf.clone();
590+
if let ChainConnectionConf::Tron(updated_conn) = &mut chain_conf.connection {
591+
updated_conn.wallet_solidity_urls = vec![wallet_solidity_url];
592+
}
593+
safety_hooks.push(chain_conf.build_merkle_tree_hook(metrics).await?.into());
594+
}
595+
596+
Ok(safety_hooks)
597+
}
598+
425599
fn validator_chain_conf_with_fallback_rpc(origin_chain_conf: &ChainConf) -> ChainConf {
426600
Self::validator_chain_conf_with_rpc_connection(origin_chain_conf, |urls| {
427601
RpcConnectionConf::HttpFallback { urls }
@@ -443,6 +617,18 @@ impl Validator {
443617
let mut updated_conn: h_eth::ConnectionConf = conn.clone();
444618
updated_conn.rpc_connection = build_rpc_connection(conn.rpc_urls());
445619
chain_conf.connection = ChainConnectionConf::Ethereum(updated_conn);
620+
} else if let ChainConnectionConf::Tron(conn) = &origin_chain_conf.connection {
621+
let mut updated_conn = conn.clone();
622+
let rpc_urls = match build_rpc_connection(conn.rpc_urls.clone()) {
623+
RpcConnectionConf::HttpFallback { urls }
624+
| RpcConnectionConf::HttpQuorum { urls } => urls,
625+
RpcConnectionConf::Http { url } => vec![url],
626+
RpcConnectionConf::Ws { .. } => {
627+
unreachable!("validator split rpc does not support ws")
628+
}
629+
};
630+
updated_conn.rpc_urls = rpc_urls;
631+
chain_conf.connection = ChainConnectionConf::Tron(updated_conn);
446632
}
447633
chain_conf
448634
}
@@ -835,6 +1021,8 @@ mod tests {
8351021
.return_const(H256::from_low_u64_be(11));
8361022
fallback.expect_provider().never();
8371023
fallback.expect_count().once().return_once(|_| Ok(3));
1024+
fallback.expect_tree().never();
1025+
fallback.expect_latest_checkpoint().never();
8381026
fallback.expect_latest_checkpoint_at_block().never();
8391027

8401028
let mut safety = MockMerkleTreeHook::new();
@@ -844,6 +1032,12 @@ mod tests {
8441032
.return_const(H256::from_low_u64_be(11));
8451033
safety.expect_provider().never();
8461034
safety.expect_count().never();
1035+
safety.expect_tree().once().return_once(|_| {
1036+
Ok(IncrementalMerkleAtBlock {
1037+
tree: Default::default(),
1038+
block_height: Some(123),
1039+
})
1040+
});
8471041
safety
8481042
.expect_latest_checkpoint()
8491043
.once()
@@ -869,6 +1063,10 @@ mod tests {
8691063
safety: Arc::new(safety),
8701064
};
8711065

1066+
assert_eq!(
1067+
hook.tree(&ReorgPeriod::None).await.unwrap().block_height,
1068+
Some(123)
1069+
);
8721070
assert_eq!(hook.count(&ReorgPeriod::None).await.unwrap(), 3);
8731071
assert_eq!(
8741072
hook.latest_checkpoint(&ReorgPeriod::None)

0 commit comments

Comments
 (0)