Skip to content

Commit d284ce0

Browse files
authored
fix(omni-relayer): added timeout to redis read related queries (#530)
* fix: added timeout to redis read related queries * chore: bumped relayer version * refactor: use `set_response_timeout` during connection manager construction
1 parent b27bc9c commit d284ce0

File tree

8 files changed

+50
-15
lines changed

8 files changed

+50
-15
lines changed

omni-relayer/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

omni-relayer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "omni-relayer"
3-
version = "0.5.11"
3+
version = "0.5.12"
44
edition = "2024"
55
resolver = "2"
66
repository = "https://github.com/Near-One/omni-bridge"

omni-relayer/example-devnet-config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ url = "redis://127.0.0.1/"
33
sleep_time_after_events_process_secs = 1
44
query_retry_attempts = 10
55
query_retry_sleep_secs = 1
6+
query_timeout_secs = 5
67
fee_retry_base_secs = 1.2
78
fee_retry_max_sleep_secs = 3600 # 60 * 60
89
keep_transfers_for_secs = 604800 # 60 * 60 * 24 * 7

omni-relayer/example-mainnet-config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ url = "redis://127.0.0.1/"
33
sleep_time_after_events_process_secs = 1
44
query_retry_attempts = 10
55
query_retry_sleep_secs = 1
6+
query_timeout_secs = 5
67
fee_retry_base_secs = 1.2
78
fee_retry_max_sleep_secs = 3600 # 60 * 60
89
keep_transfers_for_secs = 604800 # 60 * 60 * 24 * 7

omni-relayer/example-testnet-config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ url = "redis://127.0.0.1/"
33
sleep_time_after_events_process_secs = 1
44
query_retry_attempts = 10
55
query_retry_sleep_secs = 1
6+
query_timeout_secs = 5
67
fee_retry_base_secs = 1.2
78
fee_retry_max_sleep_secs = 3600 # 60 * 60
89
keep_transfers_for_secs = 604800 # 60 * 60 * 24 * 7

omni-relayer/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ pub struct Redis {
176176
pub sleep_time_after_events_process_secs: u64,
177177
pub query_retry_attempts: u64,
178178
pub query_retry_sleep_secs: u64,
179+
pub query_timeout_secs: u64,
179180
pub fee_retry_base_secs: Decimal,
180181
pub fee_retry_max_sleep_secs: i64,
181182
pub keep_transfers_for_secs: i64,

omni-relayer/src/main.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,20 @@ fn init_logging(network: Network) -> Result<()> {
9494
Ok(())
9595
}
9696

97+
async fn build_redis_connection_manager(
98+
config: &config::Config,
99+
) -> Result<redis::aio::ConnectionManager> {
100+
let redis_client = redis::Client::open(config.redis.url.clone())?;
101+
let redis_connection_manager = redis::aio::ConnectionManager::new_with_config(
102+
redis_client,
103+
redis::aio::ConnectionManagerConfig::new().set_response_timeout(
104+
tokio::time::Duration::from_secs(config.redis.query_timeout_secs),
105+
),
106+
)
107+
.await?;
108+
Ok(redis_connection_manager)
109+
}
110+
97111
#[tokio::main]
98112
async fn main() -> Result<()> {
99113
dotenv::dotenv().ok();
@@ -111,8 +125,9 @@ async fn main() -> Result<()> {
111125

112126
init_logging(config.near.network).context("Failed to initialize logging")?;
113127

114-
let redis_client = redis::Client::open(config.redis.url.clone())?;
115-
let redis_connection_manager = redis::aio::ConnectionManager::new(redis_client.clone()).await?;
128+
let redis_connection_manager = build_redis_connection_manager(&config)
129+
.await
130+
.context("Failed to create Redis connection manager")?;
116131
let jsonrpc_client = near_jsonrpc_client::JsonRpcClient::connect(config.near.rpc_url.clone());
117132

118133
let near_omni_signer = startup::near::get_signer(&config, config::NearSignerType::Omni)?;

omni-relayer/src/utils/redis.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,39 @@ pub async fn get_events(
115115
key: String,
116116
) -> Option<Vec<(String, String)>> {
117117
for _ in 0..config.redis.query_retry_attempts {
118-
if let Ok(mut iter) = redis_connection_manager
118+
let mut iter = match redis_connection_manager
119119
.hscan::<String, (String, String)>(key.clone())
120120
.await
121121
{
122-
let mut events = Vec::new();
123-
124-
while let Some(event) = iter.next_item().await {
125-
events.push(event);
122+
Ok(iter) => iter,
123+
Err(err) => {
124+
warn!("Redis hscan failed: {err:?}");
125+
tokio::time::sleep(tokio::time::Duration::from_secs(
126+
config.redis.query_retry_sleep_secs,
127+
))
128+
.await;
129+
continue;
130+
}
131+
};
132+
133+
let mut events = Vec::new();
134+
loop {
135+
match tokio::time::timeout(
136+
tokio::time::Duration::from_secs(config.redis.query_timeout_secs),
137+
iter.next_item(),
138+
)
139+
.await
140+
{
141+
Ok(Some(event)) => events.push(event),
142+
Ok(None) => break,
143+
Err(_) => {
144+
warn!("Redis hscan iteration timed out");
145+
break;
146+
}
126147
}
127-
128-
return Some(events);
129148
}
130149

131-
tokio::time::sleep(tokio::time::Duration::from_secs(
132-
config.redis.query_retry_sleep_secs,
133-
))
134-
.await;
150+
return Some(events);
135151
}
136152

137153
warn!("Failed to get events from redis db");

0 commit comments

Comments
 (0)