Skip to content

Commit 7e9d7da

Browse files
committed
fix: deduplicate rpc connections
1 parent af87b48 commit 7e9d7da

2 files changed

Lines changed: 36 additions & 36 deletions

File tree

rust/main/agents/relayer/src/relayer.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -641,25 +641,11 @@ impl Relayer {
641641
window_secs, "Initialized relay API rate limiter"
642642
);
643643

644-
let mut indexers: HashMap<String, Arc<dyn Indexer<HyperlaneMessage>>> = HashMap::new();
645-
info!(
646-
origin_count = self.origins.len(),
647-
"Setting up relay API indexers"
648-
);
649-
for (domain, origin) in &self.origins {
650-
match origin
651-
.chain_conf
652-
.build_message_indexer(&self.core_metrics, false)
653-
.await
654-
{
655-
Ok(indexer) => {
656-
indexers.insert(domain.name().to_string(), Arc::new(indexer));
657-
}
658-
Err(e) => {
659-
error!(domain = %domain.name(), error = ?e, "Failed to create mailbox indexer for relay API");
660-
}
661-
}
662-
}
644+
let indexers: HashMap<String, Arc<dyn Indexer<HyperlaneMessage>>> = self
645+
.origins
646+
.iter()
647+
.map(|(domain, origin)| (domain.name().to_string(), origin.message_indexer.clone()))
648+
.collect();
663649

664650
Some(
665651
RelayApiState::new(

rust/main/agents/relayer/src/relayer/origin.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use hyperlane_base::{
1111
};
1212
use hyperlane_core::{
1313
HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneSequenceAwareIndexerStoreReader,
14-
HyperlaneWatermarkedLogStore, InterchainGasPayment, MerkleTreeInsertion, ValidatorAnnounce,
14+
HyperlaneWatermarkedLogStore, Indexer, InterchainGasPayment, MerkleTreeInsertion,
15+
ValidatorAnnounce,
1516
};
1617
use tokio::sync::RwLock;
1718

@@ -31,6 +32,9 @@ pub struct Origin {
3132
pub gas_payment_enforcer: Arc<RwLock<GasPaymentEnforcer>>,
3233
pub prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
3334
pub message_sync: MessageSync,
35+
/// The underlying mailbox indexer shared with `message_sync`. Exposed so the relay
36+
/// API can reuse the existing RPC connection instead of opening a new one.
37+
pub message_indexer: Arc<dyn Indexer<HyperlaneMessage>>,
3438
pub interchain_gas_payment_sync: Option<InterchainGasPaymentSync>,
3539
pub merkle_tree_hook_sync: MerkleTreeHookSync,
3640
}
@@ -128,7 +132,7 @@ impl Factory for OriginFactory {
128132
};
129133

130134
let hyperlane_db = Arc::new(db.clone());
131-
let message_sync = {
135+
let (message_sync, message_indexer) = {
132136
let start_entity_init = Instant::now();
133137
let res = self
134138
.init_message_sync(&domain, chain_conf, hyperlane_db.clone())
@@ -176,6 +180,7 @@ impl Factory for OriginFactory {
176180
gas_payment_enforcer: Arc::new(RwLock::new(gas_payment_enforcer)),
177181
prover_sync: Arc::new(RwLock::new(prover_sync)),
178182
message_sync,
183+
message_indexer,
179184
interchain_gas_payment_sync,
180185
merkle_tree_hook_sync,
181186
};
@@ -222,8 +227,9 @@ impl OriginFactory {
222227
domain: &HyperlaneDomain,
223228
chain_conf: &ChainConf,
224229
db: Arc<HyperlaneRocksDB>,
225-
) -> Result<MessageSync, FactoryError> {
226-
match HyperlaneMessage::indexing_cursor(domain.domain_protocol()) {
230+
) -> Result<(MessageSync, Arc<dyn Indexer<HyperlaneMessage>>), FactoryError> {
231+
let (sync, seq_indexer) = match HyperlaneMessage::indexing_cursor(domain.domain_protocol())
232+
{
227233
CursorType::SequenceAware => Self::build_sequenced_contract_sync(
228234
domain,
229235
chain_conf,
@@ -234,7 +240,7 @@ impl OriginFactory {
234240
self.tx_id_indexing_enabled,
235241
)
236242
.await
237-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
243+
.map(|(r, i)| (r as Arc<dyn ContractSyncer<_>>, i))
238244
.map_err(|err| FactoryError::MessageSync(domain.to_string(), err.to_string())),
239245
CursorType::RateLimited => Self::build_watermark_contract_sync(
240246
domain,
@@ -246,9 +252,13 @@ impl OriginFactory {
246252
self.tx_id_indexing_enabled,
247253
)
248254
.await
249-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
255+
.map(|(r, i)| (r as Arc<dyn ContractSyncer<_>>, i))
250256
.map_err(|err| FactoryError::MessageSync(domain.to_string(), err.to_string())),
251-
}
257+
}?;
258+
// Arc<dyn SequenceAwareIndexer<T>> implements Indexer<T> via auto_impl(Arc);
259+
// wrap in an outer Arc to satisfy Arc<dyn Indexer<T>>.
260+
let indexer: Arc<dyn Indexer<HyperlaneMessage>> = Arc::new(seq_indexer);
261+
Ok((sync, indexer))
252262
}
253263

254264
async fn init_igp_sync(
@@ -268,7 +278,7 @@ impl OriginFactory {
268278
false,
269279
)
270280
.await
271-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
281+
.map(|(r, _)| r as Arc<dyn ContractSyncer<_>>)
272282
.map_err(|err| {
273283
FactoryError::InterchainGasPaymentSync(domain.to_string(), err.to_string())
274284
}),
@@ -282,7 +292,7 @@ impl OriginFactory {
282292
false,
283293
)
284294
.await
285-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
295+
.map(|(r, _)| r as Arc<dyn ContractSyncer<_>>)
286296
.map_err(|err| {
287297
FactoryError::InterchainGasPaymentSync(domain.to_string(), err.to_string())
288298
}),
@@ -306,7 +316,7 @@ impl OriginFactory {
306316
false,
307317
)
308318
.await
309-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
319+
.map(|(r, _)| r as Arc<dyn ContractSyncer<_>>)
310320
.map_err(|err| FactoryError::MerkleTreeHookSync(domain.to_string(), err.to_string())),
311321
CursorType::RateLimited => Self::build_watermark_contract_sync(
312322
domain,
@@ -318,7 +328,7 @@ impl OriginFactory {
318328
false,
319329
)
320330
.await
321-
.map(|r| r as Arc<dyn ContractSyncer<_>>)
331+
.map(|(r, _)| r as Arc<dyn ContractSyncer<_>>)
322332
.map_err(|err| FactoryError::MerkleTreeHookSync(domain.to_string(), err.to_string())),
323333
}
324334
}
@@ -331,7 +341,7 @@ impl OriginFactory {
331341
store: Arc<S>,
332342
advanced_log_meta: bool,
333343
broadcast_sender_enabled: bool,
334-
) -> eyre::Result<Arc<SequencedDataContractSync<T>>>
344+
) -> eyre::Result<(Arc<SequencedDataContractSync<T>>, SequenceIndexer<T>)>
335345
where
336346
T: Indexable + Debug,
337347
SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,
@@ -341,13 +351,15 @@ impl OriginFactory {
341351
let indexer =
342352
SequenceIndexer::<T>::try_from_with_metrics(chain_conf, metrics, advanced_log_meta)
343353
.await?;
344-
Ok(Arc::new(ContractSync::new(
354+
let indexer_clone = indexer.clone();
355+
let sync = Arc::new(ContractSync::new(
345356
domain.clone(),
346357
store.clone() as SequenceAwareLogStore<_>,
347358
indexer,
348359
sync_metrics.clone(),
349360
broadcast_sender_enabled,
350-
)))
361+
));
362+
Ok((sync, indexer_clone))
351363
}
352364

353365
async fn build_watermark_contract_sync<T, S>(
@@ -358,7 +370,7 @@ impl OriginFactory {
358370
store: Arc<S>,
359371
advanced_log_meta: bool,
360372
broadcast_sender_enabled: bool,
361-
) -> eyre::Result<Arc<WatermarkContractSync<T>>>
373+
) -> eyre::Result<(Arc<WatermarkContractSync<T>>, SequenceIndexer<T>)>
362374
where
363375
T: Indexable + Debug,
364376
SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,
@@ -367,12 +379,14 @@ impl OriginFactory {
367379
let indexer =
368380
SequenceIndexer::<T>::try_from_with_metrics(chain_conf, metrics, advanced_log_meta)
369381
.await?;
370-
Ok(Arc::new(ContractSync::new(
382+
let indexer_clone = indexer.clone();
383+
let sync = Arc::new(ContractSync::new(
371384
domain.clone(),
372385
store.clone() as WatermarkLogStore<_>,
373386
indexer,
374387
sync_metrics.clone(),
375388
broadcast_sender_enabled,
376-
)))
389+
));
390+
Ok((sync, indexer_clone))
377391
}
378392
}

0 commit comments

Comments
 (0)