Skip to content

Commit 8a633f7

Browse files
committed
feat: relayer api for cctp v2
1 parent dc8e560 commit 8a633f7

20 files changed

Lines changed: 1359 additions & 45 deletions

File tree

.spellcheck/typos.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extend-exclude = [
2424
"rust/main/chains/hyperlane-sealevel/src/provider/recipient/*.json",
2525
"rust/main/chains/hyperlane-aleo/src/indexer/mock_responses/",
2626
"rust/main/chains/hyperlane-aleo/src/provider/mock_responses/",
27+
"rust/main/chains/hyperlane-radix/src/utils.rs",
2728
"rust/main/chains/hyperlane-starknet/abis/",
2829
"rust/main/lander/src/adapter/chains/aleo/adapter/status/test_fixtures/",
2930
"rust/sealevel/environments/",

rust/main/Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/main/agents/relayer/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,24 @@ tracing-futures.workspace = true
4848
tracing.workspace = true
4949
typetag.workspace = true
5050
uuid.workspace = true
51+
tower-http = { version = "0.6", features = ["cors"] }
5152

5253
hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
5354
hyperlane-core = { path = "../../hyperlane-core", features = [
5455
"agent",
5556
"async",
5657
"matching-list",
5758
] }
59+
hyperlane-aleo = { path = "../../chains/hyperlane-aleo", optional = true }
5860
hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" }
61+
hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" }
62+
hyperlane-fuel = { path = "../../chains/hyperlane-fuel" }
63+
hyperlane-radix = { path = "../../chains/hyperlane-radix" }
5964
hyperlane-sealevel = { path = "../../chains/hyperlane-sealevel" }
65+
hyperlane-starknet = { path = "../../chains/hyperlane-starknet" }
66+
hyperlane-tron = { path = "../../chains/hyperlane-tron" }
6067
solana-sdk.workspace = true
68+
hex.workspace = true
6169
hyperlane-metric = { path = "../../hyperlane-metric" }
6270
hyperlane-operation-verifier = { path = "../../applications/hyperlane-operation-verifier" }
6371
lander = { path = "../../lander" }
@@ -83,4 +91,4 @@ oneline-errors = ["hyperlane-base/oneline-errors"]
8391
color-eyre = ["hyperlane-base/color-eyre"]
8492
test-utils = ["hyperlane-base/test-utils"]
8593
memory-profiling = ["dep:ctrlc", "dep:dhat"]
86-
aleo = ["hyperlane-base/aleo", "lander/aleo"]
94+
aleo = ["dep:hyperlane-aleo", "hyperlane-base/aleo", "lander/aleo"]

rust/main/agents/relayer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![deny(clippy::arithmetic_side_effects)]
33

44
pub mod msg;
5+
pub mod relay_api;
56

67
mod db_loader;
78
mod merkle_tree;

rust/main/agents/relayer/src/msg/metadata/base.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub enum MetadataBuildError {
4747
FastPathError(String),
4848
#[error("Merkle root mismatch ({root}, {canonical_root})")]
4949
MerkleRootMismatch { root: H256, canonical_root: H256 },
50+
/// Attestation is not yet available from the offchain lookup server (transient)
51+
#[error("Attestation pending")]
52+
AttestationPending,
5053
}
5154

5255
#[derive(Clone, Debug, PartialEq, thiserror::Error)]

rust/main/agents/relayer/src/msg/metadata/ccip_read/mod.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,40 @@ async fn metadata_build(
250250
continue;
251251
}
252252

253-
// if we fail, we want to try the other urls
254-
match fetch_offchain_data(ism_builder, &info, url, origin_tx_hash.clone()).await {
255-
Ok(data) => return Ok(data),
256-
Err(err) => {
257-
tracing::warn!(?ism_address, url, ?err, "Failed to fetch offchain data");
258-
continue;
253+
// Retry this URL while attestation is pending (transient), up to 30 attempts at 1s intervals.
254+
// Move to the next URL only on hard failures.
255+
const MAX_PENDING_RETRIES: u32 = 30;
256+
let mut pending_attempts = 0u32;
257+
loop {
258+
match fetch_offchain_data(ism_builder, &info, url, origin_tx_hash.clone()).await {
259+
Ok(data) => {
260+
tracing::info!(
261+
?ism_address,
262+
url,
263+
origin_tx_hash = ?origin_tx_hash,
264+
attempts = pending_attempts,
265+
"Successfully fetched offchain lookup data"
266+
);
267+
return Ok(data);
268+
}
269+
Err(MetadataBuildError::AttestationPending)
270+
if pending_attempts < MAX_PENDING_RETRIES =>
271+
{
272+
pending_attempts += 1;
273+
tracing::debug!(
274+
?ism_address,
275+
url,
276+
origin_tx_hash = ?origin_tx_hash,
277+
attempt = pending_attempts,
278+
max = MAX_PENDING_RETRIES,
279+
"Attestation pending, retrying in 1s"
280+
);
281+
tokio::time::sleep(Duration::from_secs(1)).await;
282+
}
283+
Err(err) => {
284+
tracing::warn!(?ism_address, url, origin_tx_hash = ?origin_tx_hash, error = ?err, "Failed to fetch offchain data");
285+
break;
286+
}
259287
}
260288
}
261289
}
@@ -331,6 +359,16 @@ async fn fetch_offchain_data(
331359
response = response_body,
332360
"Received response from offchain lookup server"
333361
);
362+
// Check for transient "pending" response before attempting full parse
363+
if let Ok(val) = serde_json::from_str::<serde_json::Value>(&response_body) {
364+
if let Some(err_msg) = val.get("error").and_then(|e| e.as_str()) {
365+
let err_lower = err_msg.to_lowercase();
366+
if err_lower.contains("pending") || err_lower.contains("not found") {
367+
return Err(MetadataBuildError::AttestationPending);
368+
}
369+
}
370+
}
371+
334372
let json: OffchainResponse = serde_json::from_str(&response_body).map_err(|err| {
335373
let error_msg = format!(
336374
"Failed to parse offchain lookup server json response: ({err}) ({response_body})"

rust/main/agents/relayer/src/msg/pending_message.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,19 @@ impl PendingMessage {
845845
} else {
846846
warn!("Repreparing message: {}", reason.clone());
847847
}
848+
// Drop the message if it has exceeded its max retry budget.
849+
// This is checked here (rather than relying on calculate_msg_backoff) so that
850+
// low max_retries values (e.g. 0 meaning "no retries") are respected even
851+
// for retry counts that fall into the fixed early backoff arms (1 => 5s, etc.).
852+
if self.num_retries > self.max_retries {
853+
warn!(
854+
message_id = ?self.message.id(),
855+
num_retries = self.num_retries,
856+
max_retries = self.max_retries,
857+
"Message exceeded max retries, dropping"
858+
);
859+
return PendingOperationResult::Drop;
860+
}
848861
PendingOperationResult::Reprepare(reason)
849862
}
850863

@@ -1049,6 +1062,10 @@ impl PendingMessage {
10491062
warn!(?root, ?canonical_root, "Merkle root mismatch");
10501063
self.on_reprepare(Some(err), ReprepareReason::ErrorBuildingMetadata)
10511064
}
1065+
// Handled inside the ccip_read retry loop; should not propagate here
1066+
MetadataBuildError::AttestationPending => {
1067+
self.on_reprepare(Some(err), ReprepareReason::CouldNotFetchMetadata)
1068+
}
10521069
});
10531070
let build_metadata_end = Instant::now();
10541071

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use eyre::{eyre, Result};
2+
use hyperlane_core::{HyperlaneMessage, Indexer, H256, H512};
3+
use std::{collections::HashMap, sync::Arc};
4+
use tracing::{debug, error, warn};
5+
6+
/// Extract all Hyperlane messages from a transaction hash on a specific chain
7+
pub async fn extract_messages(
8+
indexers: &HashMap<String, Arc<dyn Indexer<HyperlaneMessage>>>,
9+
chain_name: &str,
10+
tx_hash: &str,
11+
) -> Result<Vec<ExtractedMessage>> {
12+
// Get indexer for chain
13+
let indexer = indexers
14+
.get(chain_name)
15+
.ok_or_else(|| eyre!("Chain not found in registry: {}", chain_name))?;
16+
17+
debug!(
18+
chain = %chain_name,
19+
tx_hash = %tx_hash,
20+
"Extracting message from transaction"
21+
);
22+
23+
// Parse tx hash using protocol-specific method
24+
let tx_hash_512 = indexer
25+
.parse_tx_hash(tx_hash)
26+
.map_err(|e| eyre!("Invalid tx hash format: {}", e))?;
27+
28+
// Fetch messages from transaction
29+
let messages_with_meta = indexer
30+
.fetch_logs_by_tx_hash(tx_hash_512)
31+
.await
32+
.map_err(|e| {
33+
error!(
34+
chain = %chain_name,
35+
tx_hash = %tx_hash,
36+
error = ?e,
37+
"Failed to fetch logs from transaction"
38+
);
39+
eyre!("Failed to fetch transaction logs: {}", e)
40+
})?;
41+
42+
// Extract just the messages
43+
let messages: Vec<HyperlaneMessage> = messages_with_meta
44+
.into_iter()
45+
.map(|(indexed_msg, _log_meta)| indexed_msg.inner().clone())
46+
.collect();
47+
48+
if messages.is_empty() {
49+
error!(
50+
chain = %chain_name,
51+
tx_hash = %tx_hash,
52+
"No Hyperlane Dispatch events found in transaction"
53+
);
54+
return Err(eyre!("No Hyperlane Dispatch events found in transaction"));
55+
}
56+
57+
debug!(
58+
chain = %chain_name,
59+
tx_hash = %tx_hash,
60+
message_count = messages.len(),
61+
"Successfully extracted messages from transaction"
62+
);
63+
64+
// Check once per tx whether this is a CCTP fast transfer.
65+
// Errors are treated as false — the relay API will reject the request below.
66+
let is_cctp_v2 = indexer.is_cctp_v2(tx_hash_512).await.unwrap_or_else(|e| {
67+
warn!(
68+
chain = %chain_name,
69+
tx_hash = %tx_hash,
70+
error = ?e,
71+
"Failed to check for CCTP V2 burn event, treating as non-CCTP"
72+
);
73+
false
74+
});
75+
76+
debug!(
77+
chain = %chain_name,
78+
tx_hash = %tx_hash,
79+
is_cctp_v2,
80+
"CCTP V2 burn event check result"
81+
);
82+
83+
// Convert all messages to ExtractedMessage structs
84+
let extracted_messages: Vec<ExtractedMessage> = messages
85+
.into_iter()
86+
.map(|message| {
87+
let origin_domain = message.origin;
88+
let destination_domain = message.destination;
89+
let message_id = message.id();
90+
91+
debug!(
92+
chain = %chain_name,
93+
tx_hash = %tx_hash,
94+
message_id = ?message_id,
95+
origin_domain = origin_domain,
96+
destination_domain = destination_domain,
97+
"Extracted message"
98+
);
99+
100+
ExtractedMessage {
101+
message,
102+
origin_domain,
103+
destination_domain,
104+
message_id,
105+
is_cctp_v2,
106+
tx_hash: tx_hash_512,
107+
}
108+
})
109+
.collect();
110+
111+
Ok(extracted_messages)
112+
}
113+
114+
#[derive(Debug, Clone)]
115+
pub struct ExtractedMessage {
116+
pub message: HyperlaneMessage,
117+
pub origin_domain: u32,
118+
pub destination_domain: u32,
119+
pub message_id: H256,
120+
/// True when the transaction contains a Circle CCTP V2 `DepositForBurn` event.
121+
pub is_cctp_v2: bool,
122+
/// The origin transaction hash, used by the ccip-server to skip GraphQL lookup.
123+
pub tx_hash: H512,
124+
}

0 commit comments

Comments
 (0)